Hi again,
I have just prepared a simplified version of your benchmarking script:
import datetime
import cvb
import os
import cvb.foundation
import threading
import logging
from typing import List, Dict, Any
from enum import IntEnum
class Result:
class MeasurementItem(IntEnum):
Setup = 0
Triggering = 1
Acquisition = 2
def __init__(self):
self.images = []
self.measurement_dict = dict()
for i in [item.name for item in self.MeasurementItem]:
self.measurement_dict[i] = []
def append(self, name: str, entry: Any):
assert name in [item.name for item in self.MeasurementItem]
self.measurement_dict[name].append(entry)
def Stream_(tag: str, device: cvb.Device, node_map, nr_images: int,
result: Result, is_trigger_available: bool = True) -> None:
logging.info("streaming started: device #{}".format(tag))
device.stream().start()
if not is_trigger_available:
logging.info("triggering is omitted; device #{}".format(tag))
else:
trigger_software = node_map["TriggerSoftware"]
count = 0
logging.info("triggering started: device #{}".format(tag))
base = datetime.datetime.now()
while count < nr_images:
trigger_software.execute()
count += 1
elapsed_time = (datetime.datetime.now() - base).total_seconds()
result.append(Result.MeasurementItem.Triggering.name, elapsed_time)
logging.info("triggering completed: device #{}; {}".format(tag, elapsed_time))
logging.info("acquisition started: device #{}".format(tag))
count = 0
base = datetime.datetime.now()
while count < nr_images:
image, _ = device.stream().wait()
count += 1
result.images.append(image)
elapsed_time = get_time_diff(base)
result.append(Result.MeasurementItem.Acquisition.name, elapsed_time)
logging.info("acquisition completed: {}; {} sec for {} images".format(
tag, elapsed_time, nr_images))
device.stream().stop()
logging.info("streaming stopped: device #{}".format(tag))
def store_features(node_map_, feature_names_: List[str]) -> Dict[str, str]:
feature_value_dict = dict()
for name in feature_names_:
feature_value_dict[name] = node_map_[name].value
return feature_value_dict
def restore_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
node_map_[name].value = feature_value_dict[name]
def setup_features(node_map_, feature_value_dict: Dict[str, str]):
for name in feature_value_dict.keys():
node_map_[name] = feature_value_dict[name]
def get_time_diff(base):
return (datetime.datetime.now() - base).total_seconds()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
nr_devices = 2
#is_trigger_available = False
is_trigger_available = True
nr_images_ = 200
feature_snapshot_list = []
devices = []
node_maps = []
streams = []
results = []
for n in range(nr_devices):
results.append(Result())
logging.info("setting up device: device #{}".format(n))
base = datetime.datetime.now()
devices.append(cvb.DeviceFactory.open(
os.path.join(cvb.install_path(), "drivers", "GenICam.vin"), port=n))
node_maps.append(devices[n].node_maps["Device"])
streams.append(devices[n].stream())
streams[n].ring_buffer.change_count(nr_images_, 0)
logging.info("nr buffers: device #{}; {}".format(n, streams[n].ring_buffer.count))
if is_trigger_available:
feature_names = ["TriggerMode", "TriggerSource"]
feature_snapshot_list.append(
store_features(node_maps[n], feature_names))
feature_value_pairs = {
"TriggerMode": "On", "TriggerSource": "TriggerSoftware"}
setup_features(node_maps[n], feature_value_pairs)
elapsed_time = get_time_diff(base)
logging.info("set up completed: device #{}; {} sec".format(
n, elapsed_time))
results[n].append(Result.MeasurementItem.Setup.name, elapsed_time)
threads = []
for n in range(nr_devices):
threads.append(threading.Thread(
target=Stream_(tag=str(n), device=devices[n], node_map=node_maps[n],
result=results[n], nr_images=nr_images_,
is_trigger_available=is_trigger_available)))
for t in threads:
t.start()
for t in threads:
t.join()
if is_trigger_available:
for n in range(nr_devices):
restore_features(node_maps[n], feature_snapshot_list[n])
for n in range(nr_devices):
logging.info(
"images: device #{}; {}".format(
n, [cvb.as_array(image) for image in results[n].images]))
logging.info("{}".format("#### Summary ####"))
for n in range(nr_devices):
logging.info("device #{}".format(n))
for item in [i.name for i in Result.MeasurementItem]:
logging.info(" {}: {}".format(
item, results[n].measurement_dict[item]))
Please prepare two devices and run the script. You will see the summary at the bottom of the message console; specifically, there you will see the time consumed to set up a camera and the time consumed to acquire the required number of images.
INFO:root:#### Summary ####
INFO:root:device #0
INFO:root: Setup: [0.900323]
INFO:root: Triggering: []
INFO:root: Acquisition: [12.465828]
INFO:root:device #1
INFO:root: Setup: [0.881077]
INFO:root: Triggering: []
INFO:root: Acquisition: [12.928772]
In the example above, please note that the camera was the one that does not support external triggers. Hense I just let the camera run by its internal trigger.
When we look at the result, the acquisition is taking about 12 sec: it consists of every cost per image (60ms) * 200 images. And, as I mentioned, the camera does not support external triggers so we have no result in the “Triggering” entry.
By the way, again, I need to clearly admit that I have not tested a camera that supports external triggers including software triggers because I only have a mock/simulator; however, as far as I communicate with you I believe you can fix it by yourself when needed.
Again, you are always welcome; please feel free to come back anytime.