#!/usr/bin/env python3
import glob
import os
import sys
import warnings
import cv2
import numpy as np
import skimage.color
import skimage.filters
import skimage.io
import skimage.measure
import skimage.metrics
import skimage.morphology
import skimage.transform
from numba.typed import List
from pykdtree.kdtree import KDTree
from tqdm import tqdm
from .functions import utils, object_matching, tracer_functions, interpolation, colormap_functions
[docs]class OpFlowLab:
def __init__(self,
main_folder, image_file,
forward_velocity_folder=None, reverse_velocity_folder=None,
object_segmentation_folder=None,
velocity_type="dense", velocity_ext_type="*.bin", image_ext_type="*.tif",
velocity_dtype="float16", kernel_size=5):
self.main_folder = main_folder
self.velocity_type = velocity_type
self.velocity_ext_type = velocity_ext_type
self.image_ext_type = image_ext_type
self.image = skimage.io.imread(image_file)
self.image_shape = self.image.shape[1:]
self.velocity_dtype = velocity_dtype
self.kernel_size = kernel_size
self.forward_velocity_filelist = None
self.forward_velocity_folder = forward_velocity_folder
self.reverse_velocity_filelist = None
self.reverse_velocity_folder = reverse_velocity_folder
self.object_segmentation_filelist = None
self.object_segmentation_folder = object_segmentation_folder
self.length = self.image.shape[0]
if forward_velocity_folder is not None:
self.initialize_forward_velocities(forward_velocity_folder)
if reverse_velocity_folder is not None:
self.initialize_reverse_velocities(self.reverse_velocity_folder)
if object_segmentation_folder is not None:
self.initialize_segmentation_folder(self.object_segmentation_folder)
# TODO: Perform checks to ensure that all the filelists are of the correct length
[docs] def parse_folder(self, folder):
"""
Appends the main folder director to the folder name given
Parameters
----------
folder : str
Name of folder
Returns
-------
folder : str
Full path string containing the main folder path along with the provided folder
"""
if os.path.isdir(folder):
pass
else:
folder = os.path.join(self.main_folder, folder)
return folder
[docs] def initialize_forward_velocities(self, folder, velocity_type=None, velocity_ext_type=None):
"""
Initializes the internal folder path and file lists for the forward velocities
Parameters
----------
folder : str
Either full path of the directory or the name of the folder in the main path
velocity_type : str or None
Specify if the velocity field is a dense or sparse velocity field
velocity_ext_type : str or None
Specify the extension of file containing the velocity field
Returns
-------
"""
if velocity_type is None:
velocity_type = self.velocity_type
if velocity_ext_type is None:
velocity_ext_type = self.velocity_ext_type
self.forward_velocity_folder = self.parse_folder(folder)
self.forward_velocity_filelist = utils.load_velocity_filelist(self.forward_velocity_folder,
velocity_type=velocity_type,
file_ext=velocity_ext_type,
reverse_sort=False)
[docs] def initialize_reverse_velocities(self, folder, velocity_type=None, velocity_ext_type=None):
"""
Initializes the internal folder path and file lists for the forward velocities
Parameters
----------
folder : str
Either full path of the directory or the name of the folder in the main path
velocity_type : str or None
Specify if the velocity field is a dense or sparse velocity field
velocity_ext_type : str or None
Specify the extension of file containing the velocity field
Returns
-------
"""
if velocity_type is None:
velocity_type = self.velocity_type
if velocity_ext_type is None:
velocity_ext_type = self.velocity_ext_type
self.reverse_velocity_folder = self.parse_folder(folder)
self.reverse_velocity_filelist = utils.load_velocity_filelist(self.reverse_velocity_folder,
velocity_type=velocity_type,
file_ext=velocity_ext_type,
reverse_sort=True)
[docs] def initialize_segmentation_folder(self, folder, img_ext_type=None):
"""
Initializes the internal folder path and file lists for the segmented images
Parameters
----------
folder : str
Either full path of the directory or the name of the folder in the main path
img_ext_type : str or None
Specify the extension of the image file
Returns
-------
"""
if img_ext_type is None:
img_ext_type = self.image_ext_type
self.object_segmentation_folder = self.parse_folder(folder)
self.object_segmentation_filelist = glob.glob(os.path.join(self.object_segmentation_folder, img_ext_type))
self.object_segmentation_filelist.sort()
[docs] def load_velocities(self, filelist, img_no):
"""
Helper function to call the correct loading function based on the type of vectors as specified in
self._vector_type
Parameters
----------
filelist : (list, list) or (list,)
List of files to be loaded
img_no : int
Index to indicate which file number to load
Returns
-------
velocity_y : 2D numpy array
Array containing the y component of the vectors from motion estimation analysis
velocity_x : 2D numpy array
Array containing the x component of the vectors from motion estimation analysis
See Also
--------
utils.load_vectors_bin: Function to load vector information from optical flow generated \*.bin files
utils.load_vectors_piv: Function to load vector information from PIV generated \*.mat files
"""
velocity_x = None
velocity_y = None
if self.velocity_type == 'dense':
velocity_x = utils.load_velocity_bin(filelist[0][img_no], self.image_shape,
bin_dtype=self.velocity_dtype, kernel_size=self.kernel_size)
velocity_y = utils.load_velocity_bin(filelist[1][img_no], self.image_shape,
bin_dtype=self.velocity_dtype, kernel_size=self.kernel_size)
elif self.velocity_type == 'sparse':
velocity_x, velocity_y, x_array, y_array = utils.load_velocity_piv(filelist[0][0], img_no)
data = np.c_[y_array.ravel(), x_array.ravel()]
tree = KDTree(data, leafsize=16)
x_pos, y_pos = np.meshgrid(np.arange(self.image_shape[1]), np.arange(self.image_shape[0]))
pts = np.array(np.c_[y_pos.ravel(), x_pos.ravel()], dtype=np.float32)
distances, indices = tree.query(pts, k=5, distance_upper_bound=32.0)
pts = np.rint(pts).astype(np.int32)
coords = np.c_[velocity_y.ravel(), velocity_x.ravel()]
piv_x_interpolate = np.zeros(self.image_shape)
piv_y_interpolate = np.zeros(self.image_shape)
velocity_x, velocity_y = interpolation.inverse_distance_weighting(pts, distances, indices, coords,
piv_x_interpolate, piv_y_interpolate)
return velocity_x, velocity_y
[docs] def flowMatch_function(self,
output_folder="FlowMatch",
pairwise_threshold_distance=10,
min_object_size=None, max_object_size=None,
):
"""
Wrapper function that calls :func:`~flowMatch_iteration` for each individual pair of frames in the
image stack.
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store the interpolated vector fields
pairwise_threshold_distance : int
Distance between the propagated centroid location and the actual centroid location that is used as a cutoff for flowMatch
min_object_size : int or None
Minimum object size for it to be included in the matching process. Set to None to ignore this cutoff
max_object_size : int or None
Maximum object size for it to be included in the matching process. Set to None to ignore this cutoff
Returns
-------
total_number_list : list
List containing the total number of objects that are eligible for object matching for each frame
matched_number_list : list
List containing the total number of matched object for each frame
See Also
--------
flowMatch_iteration: function used to perform flowMatch for a single frame
"""
# create output folders
flowx_output_directory = os.path.join(self.main_folder, output_folder, 'flowx')
os.makedirs(flowx_output_directory, exist_ok=True)
flowy_output_directory = os.path.join(self.main_folder, output_folder, 'flowy')
os.makedirs(flowy_output_directory, exist_ok=True)
total_number_list = []
matched_number_list = []
for img_no in tqdm(range(self.length - 1), file=sys.stdout):
total_number, matched_number = self.flowMatch_iteration(img_no,
flowx_output_directory,
flowy_output_directory,
pairwise_threshold_distance=pairwise_threshold_distance,
min_object_size=min_object_size,
max_object_size=max_object_size,
)
total_number_list.append(total_number)
matched_number_list.append(matched_number)
return total_number_list, matched_number_list
[docs] def flowMatch_iteration(self,
frame_no,
flowx_output_directory, flowy_output_directory,
pairwise_threshold_distance=10,
min_object_size=10, max_object_size=None,
save_dtype="float16",
save_velocity_as_tif=False,
):
"""
Perform flowMatch of segmented objects using the provided motion estimation vector information for a
single iteration and outputs an interpolated vector field using the velocity obtained from the matched objects.
Parameters
----------
frame_no : int
Frame number with which to load vector
flowx_output_directory : str
Path to folder that will be used to store the x component of the interpolated vector field
flowy_output_directory : str
Path to folder that will be used to store the y component of the interpolated vector field
pairwise_threshold_distance : int
Distance between the propagated centroid location and the actual centroid location that is used as a cutoff for object matching
min_object_size : int or None
Minimum object size for it to be included in the matching process. Set to None to ignore this cutoff
max_object_size : int or None
Maximum object size for it to be included in the matching process. Set to None to ignore this cutoff
save_dtype : str
Save vectors as either float16 or float32 file
save_velocity_as_tif : bool
Save as float32 tif file instead of a float16 binary file
Returns
-------
total_number : int
Total number of objects that are eligible for object matching for each frame
matched_number : int
Total number of matched object for each frame
See Also
--------
"""
# obtain initial centroids
initial_image = skimage.io.imread(self.object_segmentation_filelist[frame_no])
if initial_image.shape != self.image_shape:
print("Label image is not the same size as the input image. Resizing label image.")
initial_image = cv2.resize(initial_image, (self.image_shape[1], self.image_shape[0]),
interpolation=cv2.INTER_NEAREST)
initial_object_properties = skimage.measure.regionprops(initial_image)
initial_properties = List()
for prop in initial_object_properties:
if min_object_size is None:
min_object_size = 0
if max_object_size is None:
max_object_size = np.inf
if min_object_size < prop["area"] < max_object_size:
initial_properties.append((prop["label"], prop["local_centroid"], prop["centroid"], prop["orientation"],
prop["major_axis_length"], np.array(prop["coords"], order='C', copy=True),
object_matching.calculate_contour(prop["image"])))
counter = 0
total_number = len(initial_properties)
matched_flow_x = np.full(self.image_shape, np.nan)
matched_flow_y = np.full(self.image_shape, np.nan)
centroid_list = [x[2] for x in initial_properties]
# obtain target centroids
target_image = skimage.io.imread(self.object_segmentation_filelist[frame_no + 1])
if target_image.shape != self.image_shape:
target_image = cv2.resize(target_image, (self.image_shape[1], self.image_shape[0]),
interpolation=cv2.INTER_NEAREST)
target_object_properties = skimage.measure.regionprops(target_image)
target_properties = List()
for prop in target_object_properties:
if min_object_size is None:
min_object_size = 0
if max_object_size is None:
max_object_size = np.inf
if min_object_size < prop["area"] < max_object_size:
target_properties.append((prop["label"], prop["local_centroid"], prop["centroid"], prop["orientation"],
prop["major_axis_length"], np.array(prop["coords"], order='C', copy=True),
object_matching.calculate_contour(prop["image"])))
# load forward vectors
forward_velocity_x, forward_velocity_y = self.load_velocities(self.forward_velocity_filelist, frame_no)
# load reverse vectors
reverse_velocity_x, reverse_velocity_y = self.load_velocities(self.reverse_velocity_filelist, frame_no)
remap_x, remap_y = interpolation.inverse_mapping(reverse_velocity_x, reverse_velocity_y, self.image_shape)
remapped_target_image = cv2.remap(target_image, remap_x.astype(np.float32), remap_y.astype(np.float32),
cv2.INTER_NEAREST)
forward_pair_list = object_matching.get_pairs(initial_properties, remapped_target_image)
forward_pair_list = forward_pair_list[~np.isnan(forward_pair_list)]
forward_pair_list = forward_pair_list.astype(np.int32)
remap_x, remap_y = interpolation.inverse_mapping(forward_velocity_x, forward_velocity_y, self.image_shape)
remapped_initial_image = cv2.remap(initial_image, remap_x.astype(np.float32), remap_y.astype(np.float32),
cv2.INTER_NEAREST)
reverse_pair_list = object_matching.get_pairs(target_properties, remapped_initial_image)
reverse_pair_list = reverse_pair_list[~np.isnan(reverse_pair_list)]
reverse_pair_list = reverse_pair_list.astype(np.int32)
forward_unique, forward_index, forward_bincount = np.unique(forward_pair_list, return_index=True,
return_counts=True)
reverse_unique, reverse_index, reverse_bincount = np.unique(reverse_pair_list, return_index=True,
return_counts=True)
diff_search = np.max(target_image) - len(target_properties)
selected_initial_mask = np.ones(len(initial_properties), np.bool8)
selected_target_mask = np.ones(len(target_properties), np.bool8)
matched_flow_x, matched_flow_y, selected_initial_mask, selected_target_mask = object_matching.calculate_matched_velocities(
initial_properties,
target_properties,
reverse_pair_list,
forward_index,
forward_unique,
forward_bincount,
reverse_unique,
reverse_bincount,
diff_search,
matched_flow_x,
matched_flow_y,
selected_initial_mask,
selected_target_mask,
)
counter += np.count_nonzero(~selected_initial_mask)
initial_properties = object_matching.remove_items(initial_properties, selected_initial_mask)
target_properties = object_matching.remove_items(target_properties, selected_target_mask)
unselected_initial = np.array([x[2] for x in initial_properties])
unselected_target = np.array([x[2] for x in target_properties])
# Round 2
forward_velocity_x_smooth = skimage.filters.gaussian(forward_velocity_x, 15, preserve_range=True)
forward_velocity_y_smooth = skimage.filters.gaussian(forward_velocity_y, 15, preserve_range=True)
reverse_velocity_x_smooth = skimage.filters.gaussian(reverse_velocity_x, 15, preserve_range=True)
reverse_velocity_y_smooth = skimage.filters.gaussian(reverse_velocity_y, 15, preserve_range=True)
selected_initial_mask = np.ones(len(unselected_initial), np.bool8)
selected_target_mask = np.ones(len(unselected_target), np.bool8)
# perform backward propagation of target centroids
forward_propagated_centroids = tracer_functions.update_tracer(unselected_initial, forward_velocity_y_smooth,
forward_velocity_x_smooth)
# perform backward propagation of target centroids
backward_propagated_centroids = tracer_functions.update_tracer(unselected_target, reverse_velocity_y_smooth,
reverse_velocity_x_smooth)
# perform nuclei matching by propagating the nuclei centroids with the flow vectors
forward_difference = object_matching.fast_distance_function(unselected_target, forward_propagated_centroids)
backward_difference = object_matching.fast_distance_function(unselected_initial,
backward_propagated_centroids).T
raw_difference = object_matching.fast_distance_function(unselected_target, unselected_initial)
for initial_index in range(forward_difference.shape[0]):
i = np.argsort(forward_difference[initial_index, :])[0]
k = np.argsort(backward_difference[initial_index, :])[0]
j = np.argsort(raw_difference[initial_index, :])[0]
if i == j and forward_difference[initial_index, i] < pairwise_threshold_distance:
target_index = j
coords, output_velocity = object_matching.single_object_velocity(initial_properties[initial_index],
target_properties[target_index])
if coords is not None:
coords = np.rint(coords).astype(np.int32)
matched_flow_x[coords[:, 0], coords[:, 1]] = output_velocity[:, 1]
matched_flow_y[coords[:, 0], coords[:, 1]] = output_velocity[:, 0]
selected_initial_mask[initial_index] = 0
selected_target_mask[target_index] = 0
counter += 1
elif k == j and backward_difference[initial_index, k] < pairwise_threshold_distance:
target_index = j
coords, output_velocity = object_matching.single_object_velocity(initial_properties[initial_index],
target_properties[target_index])
if coords is not None:
coords = np.rint(coords).astype(np.int32)
matched_flow_x[coords[:, 0], coords[:, 1]] = output_velocity[:, 1]
matched_flow_y[coords[:, 0], coords[:, 1]] = output_velocity[:, 0]
selected_initial_mask[initial_index] = 0
selected_target_mask[target_index] = 0
counter += 1
unselected_initial2 = unselected_initial[selected_initial_mask]
unselected_target2 = unselected_target[selected_target_mask]
initial_properties = object_matching.remove_items(initial_properties, selected_initial_mask)
target_properties = object_matching.remove_items(target_properties, selected_target_mask)
# Round 3
magnitude = np.sqrt(np.array(matched_flow_x) ** 2 + np.array(matched_flow_y) ** 2)
mean_magnitude = np.nanmean(magnitude)
std_magnitude = np.nanstd(magnitude)
velocity_threshold_distance = mean_magnitude + 2 * std_magnitude
while True:
old_counter = counter
final_difference = object_matching.fast_distance_function(unselected_target2, unselected_initial2)
difference_mask = final_difference < velocity_threshold_distance
pairs = np.sum(difference_mask, axis=0)
indices = np.where(pairs == 1)[0]
selected_initial_mask = np.ones(len(unselected_initial2), np.bool8)
selected_target_mask = np.ones(len(unselected_target2), np.bool8)
for target_index in indices:
initial_index = np.where(difference_mask[:, target_index] == 1)[0][0]
coords, output_velocity = object_matching.single_object_velocity(initial_properties[initial_index],
target_properties[target_index])
if coords is not None:
coords = np.rint(coords).astype(np.int32)
matched_flow_x[coords[:, 0], coords[:, 1]] = output_velocity[:, 1]
matched_flow_y[coords[:, 0], coords[:, 1]] = output_velocity[:, 0]
selected_initial_mask[initial_index] = 0
selected_target_mask[target_index] = 0
counter += 1
unselected_initial2 = unselected_initial2[selected_initial_mask]
unselected_target2 = unselected_target2[selected_target_mask]
initial_properties = object_matching.remove_items(initial_properties, selected_initial_mask)
target_properties = object_matching.remove_items(target_properties, selected_target_mask)
# print(len(unselected_initial2), len(unselected_target2))
final_difference = object_matching.fast_distance_function(unselected_target2, unselected_initial2)
difference_mask = final_difference < velocity_threshold_distance
pairs = np.sum(difference_mask, axis=1)
indices = np.where(pairs == 1)[0]
selected_initial_mask = np.ones(len(unselected_initial2), np.bool8)
selected_target_mask = np.ones(len(unselected_target2), np.bool8)
for initial_index in indices:
target_index = np.where(difference_mask[initial_index, :] == 1)[0][0]
coords, output_velocity = object_matching.single_object_velocity(initial_properties[initial_index],
target_properties[target_index])
if coords is not None:
coords = np.rint(coords).astype(np.int32)
matched_flow_x[coords[:, 0], coords[:, 1]] = output_velocity[:, 1]
matched_flow_y[coords[:, 0], coords[:, 1]] = output_velocity[:, 0]
selected_initial_mask[initial_index] = 0
selected_target_mask[target_index] = 0
counter += 1
unselected_initial2 = unselected_initial2[selected_initial_mask]
unselected_target2 = unselected_target2[selected_target_mask]
initial_properties = object_matching.remove_items(initial_properties, selected_initial_mask)
target_properties = object_matching.remove_items(target_properties, selected_target_mask)
# print(len(unselected_initial2), len(unselected_target2))
if old_counter == counter:
break
selected_centroid, counts = np.unique(np.vstack((np.array(centroid_list), unselected_initial2)), axis=0,
return_counts=True)
selected_centroid = selected_centroid[counts == 1]
matched_y_pos, matched_x_pos = selected_centroid.T
cen_round = np.rint(selected_centroid).astype(np.int32)
flow_x_cen = matched_flow_x.ravel()[np.ravel_multi_index(cen_round.T, self.image_shape)]
flow_y_cen = matched_flow_y.ravel()[np.ravel_multi_index(cen_round.T, self.image_shape)]
data = np.c_[matched_y_pos.astype(np.float32).ravel(), matched_x_pos.astype(np.float32).ravel()]
tree = KDTree(data, leafsize=16)
nanpos = np.argwhere(np.isnan(matched_flow_x))
y_pos = nanpos[:, 0]
x_pos = nanpos[:, 1]
pts = np.array(np.c_[y_pos.ravel(), x_pos.ravel()], dtype=np.float32)
distances, indices = tree.query(pts, k=10, distance_upper_bound=64.0)
values = np.c_[flow_y_cen, flow_x_cen]
matched_flow_x, matched_flow_y = interpolation.inverse_distance_weighting(pts.astype(np.int32),
distances, indices,
values,
matched_flow_x,
matched_flow_y)
# x component
utils.save_output(matched_flow_x,
os.path.join(flowx_output_directory, 'flowx_{:03d}_float16'.format(frame_no + 1)),
save_velocity_as_tif=save_velocity_as_tif, save_dtype=save_dtype)
# y component
utils.save_output(matched_flow_y,
os.path.join(flowy_output_directory, 'flowy_{:03d}_float16'.format(frame_no + 1)),
save_velocity_as_tif=save_velocity_as_tif, save_dtype=save_dtype)
return total_number, counter
[docs] def flowWarp_function(self,
output_folder="FlowWarp",
start_frame=0,
):
"""
Wrapper function that calls :func:`~flowWarp_iteration` for each individual pair of frames in the
image stack.
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store the interpolated vector fields
start_frame : int
Specify the starting frame to calculate flowWarp
See Also
--------
flowWarp_iteration: function used to perform flowWarp for a single frame
"""
transformation_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(transformation_folder, exist_ok=True)
for frame_no in tqdm(range(start_frame, self.length - 1)):
tracer_pos, _ = self.flowWarp_iteration(frame_no, transformation_folder)
[docs] def flowWarp_iteration(self,
frame_no,
transformation_folder):
"""
Performs flowWarp of an image using the calculated velocity field for that image and calculate the normalised
root mean squared error between it and the next image in the sequence.
Parameters
----------
frame_no : int
Frame number with which to load vector
transformation_folder : str
Path to folder that will be used to store the x component of the interpolated vector field
Returns
-------
nrmse : int
Returns the normalized root mean square error between the warped image and the ground truth image
output_image : int
Returns the warped image
See Also
--------
"""
flow_x, flow_y = self.load_velocities(self.forward_velocity_filelist, frame_no)
img_frame = self.image[frame_no]
map1_inverse, map2_inverse = interpolation.inverse_mapping(flow_x, flow_y, self.image_shape)
remapped_image = cv2.remap(img_frame, map1_inverse.astype(np.float32),
map2_inverse.astype(np.float32), cv2.INTER_CUBIC)
output_image = np.dstack((self.image[frame_no + 1], remapped_image))
output_image = np.moveaxis(output_image, -1, 0)
skimage.io.imsave(os.path.join(transformation_folder, 'output_{:03d}.tif'.format(frame_no)), output_image,
check_contrast=False, metadata={'mode': 'composite'}, imagej=True)
nrmse = skimage.metrics.normalized_root_mse(self.image[frame_no + 1], remapped_image)
return nrmse, output_image
[docs] def flowPath_function(self,
output_folder="FlowPath",
start_frame=0, frames=10,
alpha=0,
spacing=8,
max_tracers=None,
):
"""
Wrapper function that calls :func:`~flowPath_iteration` for each frames in the image stack.
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store the vector trace images
start_frame : int
Frame number with which to start vector trace visualization
frames : int
Length of trails in number of frames that will be drawn for each artificial tracer
alpha : int
Specifies the alpha value for the flowPath overlay over the original image
spacing : int
Grid spacing used to initialize tracers
max_tracers : int or None
Maximum number of tracers to be used in the calculation [Currently unused]
Returns
-------
None
See Also
--------
initialize_flow_path : function used to initialize tracers
flow_path_iteration : function used to perform vector trace visualization for a single frame
"""
# create output folders
flow_trace_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(flow_trace_folder, exist_ok=True)
tracer_pos, all_pos_index, x_array_shape = self.flowPath_initialization(frames=frames,
spacing=spacing,
max_tracers=max_tracers)
for frame_no in tqdm(range(start_frame, self.length - 1)):
img_frame = None
if alpha != 0:
img_frame = self.image[frame_no]
tracer_pos, _, _ = self.flowPath_iteration(frame_no, tracer_pos, all_pos_index, x_array_shape,
flow_trace_folder,
alpha=alpha,
img_frame=img_frame,
frames=frames,
spacing=spacing,
)
[docs] def flowPath_initialization(self,
frames=10,
spacing=8,
max_tracers=None,
):
"""
Initializes the tracers that will be used for vector trace visualization
Parameters
----------
frames : int
Length of trails in number of frames that will be drawn for each artificial tracer
spacing : int
Grid spacing used to initialize tracers.
max_tracers : int or None
Maximum number of tracers to be used in the calculation [Currently unused]
Returns
-------
tracer_pos : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
all_pos_index: iter
Iterable that outputs the tracer number
x_array_shape: (int, int)
Tuple containing the shape of the grid used to generate tracers
See Also
--------
flowPath_iteration: function used to perform vector trace visualization for a single frame
"""
if max_tracers is None:
max_tracers = self.image_shape[0] * self.image_shape[1]
tracer_pos = np.full((frames, max_tracers, 2), fill_value=np.nan)
x = np.arange(spacing // 2, self.image_shape[1], spacing, dtype=np.float32)
y = np.arange(spacing // 2, self.image_shape[0], spacing, dtype=np.float32)
x_array, y_array = np.meshgrid(x, y)
x_array_shape = x_array.shape
all_pos_index = np.arange(x_array_shape[0] * x_array_shape[1])
tracer_pos[0, :x_array.flatten().shape[0], 1] = x_array.flatten()
tracer_pos[0, :y_array.flatten().shape[0], 0] = y_array.flatten()
return tracer_pos, all_pos_index, x_array_shape
[docs] def flowPath_iteration(self,
frame_no, tracer_pos, all_pos_index, x_array_shape,
flow_trace_folder,
alpha=0,
img_frame=None,
frames=10,
spacing=8,
colormap=None,
colormap_min=None,
colormap_max=None,
metric_type="angle",
):
"""
Calculates and outputs the flowPath for a single iteration based on the estimated velocity field.
Parameters
----------
frame_no : int
Frame number with which to perform vector trace visualization
tracer_pos : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
all_pos_index: iter
Iterable that outputs the tracer number
x_array_shape: (int, int)
Tuple containing the shape of the grid used to generate tracers
flow_trace_folder : str
Folder name that will be created in the main directory to store the vector trace images
alpha : int
Specifies the alpha value for the flowPath overlay over the original image
frames : int
Length of trails in number of frames that will be drawn for each artificial tracer
spacing : int
Grid spacing used to initialize tracers
colormap : Colormap, str, or None
Colormap to be used to colorize the flowPaths
colormap_min, colormap_max : float
Minimum and/or maximum value that is used to determine the normalization of the colormap
metric_type : str
Type of metric that is used to colorize the flowPaths
Returns
-------
tracer_pos : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
output_image : 3D array of shape (m, n, 3)
RGB image showing the tails of the tracers
See Also
--------
flowPath_initialization : function used to initialize tracers
"""
# load velocity
flow_x, flow_y = self.load_velocities(self.forward_velocity_filelist, frame_no)
temp_tracer_pos = tracer_functions.update_tracer(tracer_pos[0, ...], flow_y, flow_x)
tracer_pos = np.delete(tracer_pos, -1, axis=0)
tracer_pos = np.insert(tracer_pos, 0, temp_tracer_pos, axis=0)
# Identify valid tracers to draw
draw_pos = np.any(~np.isnan(tracer_pos), axis=(0, 2))
draw_tracer_pos = tracer_pos[:, draw_pos, :]
draw_tracer_pos = np.rint(draw_tracer_pos)
# Identify empty locations
filled_pos = tracer_pos[:(frames // 2), ...][~np.all(np.isnan(tracer_pos[:(frames // 2), ...]), axis=2)]
filled_pos = np.floor_divide(filled_pos, spacing)
filled_index = np.ravel_multi_index(np.array(filled_pos, dtype=np.int32).T, x_array_shape, mode='clip')
diff_index = np.setdiff1d(all_pos_index, filled_index)
# fill in additional tracers
empty_pos = np.where(np.all(np.isnan(tracer_pos), axis=(0, 2)))[0]
for i, index in enumerate(diff_index):
y, x = np.unravel_index(index, x_array_shape)
tracer_pos[0, empty_pos[i], :] = [y * spacing + spacing // 2, x * spacing + spacing // 2]
# catch and ignore warning when the values are all nan
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
x_diff = np.nanmean(draw_tracer_pos[-2::-1, :, 1] - draw_tracer_pos[-1:0:-1, :, 1], axis=0)
y_diff = np.nanmean(draw_tracer_pos[-2::-1, :, 0] - draw_tracer_pos[-1:0:-1, :, 0], axis=0)
# obtain colors for drawing
_colormap = colormap_functions.parse_colormap(colormap)
colormap_type = "bar"
if metric_type.lower() == "angle":
metric = np.arctan2(y_diff, -x_diff, where=~(np.isnan(x_diff) | np.isnan(y_diff))) * 180 / np.pi
colormap_min = -180
colormap_max = 180
colormap_type = "wheel"
elif metric_type.lower() == "speed":
metric = np.sqrt(y_diff**2 + x_diff**2, where=~(np.isnan(x_diff) | np.isnan(y_diff)))
elif metric_type.lower() == "vx":
metric = np.where(~(np.isnan(x_diff) | np.isnan(y_diff)), x_diff, 0)
elif metric_type.lower() == "vy":
metric = np.where(~(np.isnan(x_diff) | np.isnan(y_diff)), y_diff, 0)
colors, colormap_min, colormap_max = colormap_functions.get_colors(metric, _colormap, colormap_min=colormap_min, colormap_max=colormap_max)
if alpha != 0:
output_image = skimage.color.gray2rgb(alpha * utils.imadjust(img_frame))
output_image = output_image/np.max(output_image)
output_image = skimage.util.img_as_ubyte(output_image)
else:
output_image = np.ones(self.image_shape + (3,), dtype=np.uint8) * 101
for i in range(0, draw_tracer_pos.shape[1], 1):
points = draw_tracer_pos[np.newaxis, :, i, ::-1] # opencv takes (x,y)
nan_pos = np.all(~np.isnan(points), axis=-1)
points = np.array(points[np.newaxis, nan_pos, :], dtype=np.int32)
output_image = cv2.polylines(output_image, points, isClosed=False, color=colors[i, :3] * 255)
skimage.io.imsave(os.path.join(flow_trace_folder, 'output_{:03d}.tif'.format(frame_no)), output_image,
check_contrast=False)
if colormap_type == "wheel":
colormap_vis = colormap_functions.save_colorwheel(_colormap,
filepath=os.path.join(flow_trace_folder, "colorwheel_{:03d}.png".format(frame_no)), )
elif colormap_type == "bar":
colormap_vis = colormap_functions.save_colorbar(_colormap,
filepath=os.path.join(flow_trace_folder, "colorbar_{:03d}.png".format(frame_no)),
colormap_min=colormap_min, colormap_max=colormap_max, )
return tracer_pos, output_image, colormap_vis
[docs] def flowTracer_function(self,
output_folder="FlowTracer",
save_initial_tracer_pos=True,
use_random_tracers=True,
tracer_no=10000,
spacing=16,
start_frame=0,
):
"""
Iterates and plots positions of artificial tracers based on provided motion estimation vectors
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store the artificial tracer images
save_initial_tracer_pos : Bool
Save initial tracer positions
use_random_tracers : Bool
If true, generate tracer positions based on a regular grid. If false, generate random position for tracers.
tracer_no : int
Maximum number of artificial tracers to use
start_frame : int
Frame number with which to start vector trace visualization
spacing : int
Grid spacing used to initialize tracers
start_frame : int
Frame number with which to start vector trace visualization
Returns
-------
See Also
--------
"""
tracer_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(tracer_folder, exist_ok=True)
tracer_centroids, color = self.flowTracer_initialization(save_initial_tracer_pos=save_initial_tracer_pos,
use_random_tracers=use_random_tracers,
tracer_no=tracer_no,
spacing=spacing,
)
# save first image
self.flowTracer_first_frame(tracer_folder,
start_frame, tracer_centroids, color,
)
for frame_no in tqdm(range(start_frame, self.length - 1)):
tracer_centroids, output_image = self.flowTracer_iteration(tracer_folder,
frame_no, tracer_centroids, color,
)
[docs] def flowTracer_initialization(self,
save_initial_tracer_pos=True,
use_random_tracers=True,
tracer_no=10000,
spacing=16,
segmentation_filelist=None,
):
"""
Initializes the tracers that will be used for vector trace visualization
Parameters
----------
save_initial_tracer_pos : bool
Set to True to save initial positions
use_random_tracers : bool
Set to True to use tracers with randomly defined starting x,y positions
spacing : int
Grid spacing used to initialize tracers.
tracer_no : int or None
Maximum number of tracers to be used in the calculation [Currently unused]
segmentation_filelist : list
List containing the segmented images. Used for setting the location of the intial flowTracers to the center
of the segmented objects.
Returns
-------
tracer_pos : 2D array of shape (tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
all_pos_index: iter
Iterable that outputs the tracer number
x_array_shape: (int, int)
Tuple containing the shape of the grid used to generate tracers
See Also
--------
"""
if segmentation_filelist is not None:
initial_image = skimage.io.imread(self.object_segmentation_filelist[0])
if initial_image.shape != self.image_shape:
print("Label image is not the same size as the input image. Resizing label image.")
initial_image = cv2.resize(initial_image, (self.image_shape[1], self.image_shape[0]),
interpolation=cv2.INTER_NEAREST)
# initial_image = object_matching.remove_objects(initial_image, 100, 2000)
initial_object_properties = skimage.measure.regionprops(initial_image)
initial_object_centroids = np.array([np.array(x['Centroid']) for x in initial_object_properties])
tracer_x = np.array(initial_object_centroids[:, 1], dtype=np.float32)
tracer_y = np.array(initial_object_centroids[:, 0], dtype=np.float32)
elif use_random_tracers:
tracer_x = np.array(np.random.randint(1, self.image_shape[1], tracer_no), dtype=np.float32)
tracer_y = np.array(np.random.randint(1, self.image_shape[0], tracer_no), dtype=np.float32)
else:
x = np.arange(spacing // 2, self.image_shape[1], spacing, dtype=np.float32)
y = np.arange(spacing // 2, self.image_shape[0], spacing, dtype=np.float32)
x_array, y_array = np.meshgrid(x, y)
tracer_x = x_array.flatten()
tracer_y = y_array.flatten()
tracer_centroids = np.array((tracer_y, tracer_x)).T
color = np.array([colormap_functions.create_color() for _ in range(tracer_centroids.shape[0])])
if save_initial_tracer_pos:
import pickle
tracer_coord = {'tracer_x': tracer_x,
'tracer_y': tracer_y,
'color': color,
}
with open(os.path.join(self.main_folder, 'tracer.pkl'), 'wb') as file:
pickle.dump(tracer_coord, file)
return tracer_centroids, color
[docs] def flowTracer_first_frame(self,
tracer_folder,
frame_no, tracer_centroids, color,
radius=3,
):
"""
Plots and draws the position of the tracers in the first frame
Parameters
----------
tracer_folder : str
Folder name that will be created in the main directory to store the flowTracer images
frame_no : int
Frame number with which to perform vector trace visualization
tracer_centroids : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
color : 2D array
Array containing the color information for each tracer
radius : int
Specifies the radius of the drawn circle
Returns
-------
output_image : 3D array of shape (m, n, 3)
RGB image showing the position of tracers
See Also
--------
"""
first_image = skimage.util.img_as_float32(utils.imadjust(self.image[frame_no, ...]))
first_image = skimage.color.gray2rgb(first_image)
output_image = tracer_functions.draw_tracer(first_image, tracer_centroids, color, radius=radius)
skimage.io.imsave(os.path.join(tracer_folder, 'output_{:03d}.tif'.format(frame_no)),
skimage.util.img_as_ubyte(output_image))
return output_image
[docs] def flowTracer_iteration(self,
tracer_folder,
frame_no, tracer_centroids, color,
radius=3,
):
"""
Calculates and outputs flowTracer for a single iteration
Parameters
----------
tracer_folder : str
Folder name that will be created in the main directory to store the flowTracer images
frame_no : int
Frame number with which to perform vector trace visualization
tracer_centroids : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
color : 2D array
Array containing the color information for each tracer
radius : int
Specifies the radius of the drawn circle
Returns
-------
tracer_centroids : 3D array of shape (frames, tracer_no, 2)
Array containing the positions of the tracers in the format [y,x]
output_image : 3D array of shape (m, n, 3)
RGB image showing the position of tracers
See Also
--------
flowTracer_initialization : function used to initialize tracers
"""
flow_x, flow_y = self.load_velocities(self.forward_velocity_filelist, frame_no)
tracer_centroids = tracer_functions.update_tracer(tracer_centroids, flow_y, flow_x)
# Identify valid tracers to draw
draw_pos = np.any(~np.isnan(tracer_centroids), axis=1)
draw_tracer_pos = tracer_centroids[draw_pos, :]
draw_colors = color[draw_pos]
draw_tracer_pos = np.rint(draw_tracer_pos)
output_image = skimage.util.img_as_float32(utils.imadjust(self.image[frame_no+1, ...]))
output_image = tracer_functions.draw_tracer(skimage.color.gray2rgb(output_image),
draw_tracer_pos, draw_colors, radius=radius)
skimage.io.imsave(os.path.join(tracer_folder, 'output_{:03d}.tif'.format(frame_no + 1)),
skimage.util.img_as_ubyte(output_image))
return tracer_centroids, output_image
[docs] def flowDerivative_iteration(self,
output_folder, frame_no,
stat_matrix=None,
file_string=None,
smoothing=None,
colormap=None,
colormap_min=None, colormap_max=None,
):
"""
Calculates gradients and output the various flow derivatives for a single iteration
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store flow derivative images
frame_no : int
Frame number with which to calculate flow derivatives
stat_matrix : 2D array of shape (2, 2)
Array containing formulation of
file_string: str
File name in the format '[name]_{:03d}.tif'
colormap : str or None
Colormap to plot
smoothing : int
Size of gaussian smoothing kernel
colormap_min, colormap_max : float
Minimum and/or maximum value that is used to determine the normalization of the colormap
Returns
-------
output_image : 2D array
Output image containing the colorized flow derivative
See Also
--------
flowCurl_function : wrapper function to plot curl for all frames
flowDivergence_function : wrapper function to plot divergence for all frames
flowShear_function : wrapper function to plot simple shear for all frames
"""
_colormap = colormap_functions.parse_colormap(colormap)
flow_x, flow_y = self.load_velocities(self.forward_velocity_filelist, frame_no)
if smoothing is not None:
from scipy.ndimage import gaussian_filter
flow_x = gaussian_filter(flow_x, smoothing)
flow_y = gaussian_filter(flow_y, smoothing)
dflowx_dy, dflowx_dx = np.gradient(flow_x)
dflowy_dy, dflowy_dx = np.gradient(flow_y)
# negative sign needed due to the fact that the yaxis is inverted
stat = -np.array([[dflowx_dy, dflowx_dx], [dflowy_dy, dflowy_dx]]) * stat_matrix[..., np.newaxis, np.newaxis]
stat = np.sum(stat, axis=(0, 1))
if _colormap is None:
skimage.io.imsave(os.path.join(output_folder, file_string.format(frame_no)), stat,
check_contrast=False)
colormap_vis = None
else:
output_image, colormap_min, colormap_max = colormap_functions.get_colors(stat, _colormap, colormap_min=colormap_min, colormap_max=colormap_max)
output_image = output_image[..., :3] # remove alpha layer
output_image = skimage.util.img_as_ubyte(output_image)
skimage.io.imsave(os.path.join(output_folder, file_string.format(frame_no)), output_image,
check_contrast=False)
stat = output_image
colormap_vis = colormap_functions.save_colorbar(_colormap,
filepath= os.path.join(output_folder, "colorbar_{:03d}.png".format(frame_no)),
colormap_min=colormap_min, colormap_max=colormap_max, )
return stat, colormap_vis
[docs] def flowCurl_function(self,
output_folder="curl",
smoothing=None,
colormap=None,
colormap_min=None, colormap_max=None,
):
"""
Calculates gradients and output the curl values for an image stack
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store flow derivative images
smoothing : int or Bool
Sigma value for gaussian smoothing
colormap_min, colormap_max : float
Minimum and/or maximum value that is used to determine the normalization of the colormap
Returns
-------
See Also
--------
flowDerivative_iteration : base function to generate flow derivative images
"""
curl_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(curl_folder, exist_ok=True)
curl_string = 'curl_{:03d}.tif'
colormap = colormap_functions.parse_colormap(colormap)
curl_stat_matrix = np.array([[-1, 0], [0, 1]])
for img_no in tqdm(range(self.length - 1)):
self.flowDerivative_iteration(curl_folder, img_no, curl_stat_matrix, curl_string,
smoothing=smoothing,
colormap=colormap,
colormap_min=colormap_min, colormap_max=colormap_max,
)
[docs] def flowDivergence_function(self,
output_folder="divergence",
smoothing=None,
colormap=None,
colormap_min=None, colormap_max=None,
):
"""
Calculates gradients and output the divergence values for an image stack
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store flow derivative images
smoothing : int or Bool
Sigma value for gaussian smoothing
colormap_min, colormap_max : float
Minimum and/or maximum value that is used to determine the normalization of the colormap
Returns
-------
See Also
--------
flowDerivative_iteration : base function to generate flow derivative images
"""
divergence_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(divergence_folder, exist_ok=True)
divergence_string = 'divergence_{:03d}.tif'
colormap = colormap_functions.parse_colormap(colormap)
divergence_stat_matrix = np.array([[0, 1], [1, 0]])
for img_no in tqdm(range(self.length - 1)):
self.flowDerivative_iteration(divergence_folder, img_no, divergence_stat_matrix, divergence_string,
smoothing=smoothing,
colormap=colormap,
colormap_min=colormap_min, colormap_max=colormap_max,
)
[docs] def flowShear_function(self,
output_folder="shear",
smoothing=None,
colormap=None,
colormap_min=None, colormap_max=None,
):
"""
Calculates gradients and output the simple shear values for an image stack
Parameters
----------
output_folder : str
Folder name that will be created in the main directory to store flow derivative images
smoothing : int or Bool
Sigma value for gaussian smoothing
colormap_min, colormap_max : float
Minimum and/or maximum value that is used to determine the normalization of the colormap
Returns
-------
See Also
--------
flowDerivative_iteration : base function to generate flow derivative images
"""
shear_folder = os.path.join(self.forward_velocity_folder, output_folder)
os.makedirs(shear_folder, exist_ok=True)
shear_string = 'shear_{:03d}.tif'
colormap = colormap_functions.parse_colormap(colormap)
shear_stat_matrix = np.array([[1, 0], [0, 1]])
for img_no in tqdm(range(self.length - 1)):
self.flowDerivative_iteration(shear_folder, img_no, shear_stat_matrix, shear_string,
smoothing=smoothing,
colormap=colormap,
colormap_min=colormap_min, colormap_max=colormap_max,
)