import cv2
import numpy as np
from numba import njit, prange
from numba.typed import List
[docs]@njit(parallel=True)
def fast_distance_function(centroids, pos):
"""Calculates pairwise Euclidean distances between two array of coordinates
Parameters
----------
centroids : array_like
List of centroid coordinates of shape (n,2)
pos : array_like
List of position coordinates of shape (m,2)
Returns
-------
d : array_like
Pairwise distance between coordinates
"""
n = centroids.shape[0]
m = pos.shape[0]
d = np.empty((m, n), dtype=centroids.dtype)
for i in prange(m):
for j in range(n):
d[i, j] = ((centroids[j, 0] - pos[i, 0]) ** 2 + (centroids[j, 1] - pos[i, 1]) ** 2) ** 0.5
return d
[docs]@njit
def calculate_distance(x, y, circular=True):
"""Calculates distances between consecutive points in the list
Parameters
----------
x, y : array_like
List of x, y position to calculate the Euclidean distances from
circular : bool, optional
Determine if the distance between the last and first point should be calculated
Returns
-------
dist : array_like
List of distances between consecutive points
"""
dist = np.sqrt((x[1:] - x[:-1]) ** 2 + (y[1:] - y[:-1]) ** 2)
if circular is True:
np.append(dist, np.sqrt((x[-1] - x[0]) ** 2 + (y[-1] - y[0]) ** 2))
return dist
[docs]@njit
def reslice_positions(x, y, num_points=21):
"""Interpolates a list of x, y positions to a list of evenly spaced points
Parameters
----------
x, y : array_like
List of x, y position to interpolate from
num_points : int, optional
Number of points with which to generate from the list
Returns
-------
output_x, output_y : array_like
List of interpolated points
"""
output_x = np.empty(num_points, np.float32)
output_y = np.empty(num_points, np.float32)
profile_length = np.sum(calculate_distance(x, y))
spacing = profile_length / num_points
pos = 0
for pt in range(num_points):
if pt == 0:
current_x = x[0]
current_y = y[0]
else:
residual_spacing = spacing
while True:
pos += 1
if pos >= x.shape[0]:
break
distance = np.sqrt((x[pos] - current_x) ** 2 + (y[pos] - current_y) ** 2)
if distance >= residual_spacing:
current_x = current_x + (x[pos] - current_x) / distance * residual_spacing
current_y = current_y + (y[pos] - current_y) / distance * residual_spacing
break
else:
residual_spacing = residual_spacing - distance
current_x = x[pos]
current_y = y[pos]
output_x[pt] = current_x
output_y[pt] = current_y
return output_x, output_y
[docs]@njit(parallel=True)
def get_pairs(properties, remapped_target_image, coord_pos=5):
"""Identifies potential pairs from a warped image based on the centroid positions
Parameters
----------
properties : array_like
List of properties for each object in the image
remapped_target_image : ndarray
Remapped image that has been warped with a velocity field
coord_pos : int, optional
Index indicating the position of the coordinates inside properties
Returns
-------
forward_pair_list : array_like
Array containing the list of pairs
"""
length = len(properties)
forward_pair_list = np.full(length, np.nan)
for i in range(length):
obj = properties[i][coord_pos]
lst = []
for j in range(len(obj)):
lst.append(remapped_target_image[obj[j, 0], obj[j, 1]])
counts = np.bincount(lst)
counts = counts / len(lst)
forward_pair_list[i] = np.argmax(counts)
return forward_pair_list
[docs]@njit(parallel=True)
def calculate_matched_velocities(initial_properties, target_properties, reverse_pair_list,
forward_index, forward_unique, forward_bincount,
reverse_unique, reverse_bincount, diff_search,
matched_flow_x, matched_flow_y,
selected_initial_mask, selected_target_mask,
label_pos=0,
):
"""Calculates the velocities between matched pairs
Parameters
----------
initial_properties : array_like
List of properties for each object in the initial image
target_properties : array_like
List of properties for each object in the target image
reverse_pair_list : array_like
List of pairs between the target image and the warped initial image
forward_index : array_like
Indices of the first occurrence of the pairs
forward_unique : array_like
Label of the corresponding pair
forward_bincount : array_like
Number of times that the pair is returned
reverse_unique : array_like
Label of the corresponding pair in the reverse time direction
reverse_bincount : array_like
Number of times that the pair is returned in the reverse time direction
diff_search : int
Difference between the largest value of the labelled image and the length of the target property.
Used to reduce the number of items used in the search to find a match
matched_flow_x : ndarray
Calculated velocities in the x direction
matched_flow_y : ndarray
Calculated velocities in the y direction
selected_initial_mask : array_like
Mask to track which objects in the initial image have been paired
selected_target_mask : array_like
Mask to track which objects in the target image have been paired
label_pos : int, optional
Index indicating the position of the labels inside properties
Returns
-------
matched_flow_x : ndarray
Calculated velocities in the x direction
matched_flow_y : ndarray
Calculated velocities in the y direction
selected_initial_mask : array_like
Mask to track which objects in the initial image have been paired
selected_target_mask : array_like
Mask to track which objects in the target image have been paired
"""
for i in prange(len(forward_index)):
initial_index = forward_index[i]
target_value = forward_unique[i]
forward_count = forward_bincount[i]
search_start = np.maximum(target_value - diff_search - 1, 0)
search_end = np.minimum(len(target_properties) + 1, target_value + 1)
search_field = target_properties[search_start: search_end]
for k in range(len(search_field)):
if search_field[k][label_pos] == target_value:
target_index = search_start + k
initial_label = initial_properties[initial_index][label_pos]
reverse_counts = reverse_bincount[np.where(reverse_unique == initial_label)[0]]
if len(reverse_counts) > 0:
reverse_count = reverse_counts[0]
if reverse_pair_list[target_index] == initial_label:
if forward_count == 1 and reverse_count == 1:
coords, output_velocity = single_object_velocity(initial_properties[initial_index],
target_properties[target_index])
if coords is not None:
for ind in range(len(coords)):
matched_flow_x[np.int32(coords[ind, 0]), np.int32(coords[ind, 1])] = output_velocity[ind, 1]
matched_flow_y[np.int32(coords[ind, 0]), np.int32(coords[ind, 1])] = output_velocity[ind, 0]
selected_initial_mask[initial_index] = 0
selected_target_mask[target_index] = 0
return matched_flow_x, matched_flow_y, selected_initial_mask, selected_target_mask
[docs]@njit
def single_object_velocity(object1_properties, object2_properties,
local_centroid_pos=1, centroid_pos=2,
orientation_pos=3, major_axis_pos=4,
coords_pos=5, contour_pos=6):
"""Calculated the velocity of the object between two time frames
Parameters
----------
object1_properties : array_like
List of properties for the object in time frame 1
object2_properties : array_like
List of properties for the object in time frame 2
local_centroid_pos : int, optional
Index indicating the position of the local centroid potision inside properties
centroid_pos : int, optional
Index indicating the position of the centroid position inside properties
orientation_pos : int, optional
Index indicating the position of the orientation inside properties
major_axis_pos : int, optional
Index indicating the position of the major axis length inside properties
coords_pos : int, optional
Index indicating the position of the coordinates inside properties
contour_pos : int, optional
Index indicating the position of the contour inside properties
Returns
-------
pt1_coords : array_like
Coordinates of all the pixels in the object
output_velocity : array_like
Velocity for each pixel in the object
"""
pt1_local_centroid = np.array(object1_properties[local_centroid_pos])
pt1_centroid = np.array(object1_properties[centroid_pos])
pt1_orientation = object1_properties[orientation_pos]
pt1_major_axis = object1_properties[major_axis_pos]
pt1_coords = object1_properties[coords_pos]
pt1_contour = object1_properties[contour_pos]
pt1_perimeter = obtain_perimeter_points(pt1_local_centroid, pt1_orientation, pt1_major_axis, pt1_contour)
pt2_local_centroid = np.array(object2_properties[local_centroid_pos])
pt2_centroid = np.array(object2_properties[centroid_pos])
pt2_orientation = object2_properties[orientation_pos]
if np.abs(pt2_orientation - pt1_orientation) > np.pi / 2:
pt2_orientation = np.pi + pt2_orientation
pt2_major_axis = object2_properties[major_axis_pos]
pt2_contour = object2_properties[contour_pos]
pt2_perimeter = obtain_perimeter_points(pt2_local_centroid, pt2_orientation, pt2_major_axis, pt2_contour)
affine = estimate_affine(pt1_perimeter, pt2_perimeter)
if affine is not None:
temp_coords = pt1_coords - pt1_centroid + pt1_local_centroid
temp_coords = np.ascontiguousarray(temp_coords.astype(np.float32).T)
new_coords = (np.ascontiguousarray(affine[:, :2]) @ temp_coords).T + affine[:, 2]
velocity = new_coords - temp_coords.T
distance = np.sqrt((pt1_coords[:, 1] - pt1_centroid[1]) ** 2 + (pt1_coords[:, 0] - pt1_centroid[0]) ** 2)
centroid_index = np.argmin(distance)
local_velocity = velocity - velocity[centroid_index, :]
output_velocity = local_velocity + pt2_centroid - pt1_centroid
return pt1_coords, output_velocity
else:
return None, None
[docs]@njit
def obtain_perimeter_points(local_centroid, orientation, major_axis, contour):
"""Obtain points on the object perimeter starting from the top most point on the long axis of the fitted ellipse
Parameters
----------
local_centroid : array_like
Local centroid position of the object
orientation : float
Orientation of the object
major_axis : float
Length of the major axis of the object
contour : array_like
List of points on the contour of the object
Returns
-------
pt_list : array_like
Sorted coordinates of the perimeter points starting from the top most point on the long axis
"""
top_pt = local_centroid + np.array((np.cos(orientation) * major_axis / 2, np.sin(orientation) * major_axis / 2))
distance = np.sqrt((contour[:, 0] - top_pt[1]) ** 2 + (contour[:, 1] - top_pt[0]) ** 2)
start_point = np.argmin(distance)
contour = np.vstack((contour[start_point:, ...], contour[:start_point, ...]))
pt_x, pt_y = reslice_positions(contour[:, 0], contour[:, 1])
pt_y = np.append(pt_y, local_centroid[0])
pt_x = np.append(pt_x, local_centroid[1])
pt_list = np.vstack((pt_y, pt_x)).T
pt_list = pt_list.astype(np.float32)
return pt_list
[docs]@njit
def estimate_affine(pt1_perimeter, pt2_perimeter):
"""Calculates the affine matrix needed to transform the points in the perimeter of the first object to the second object
Parameters
----------
pt1_perimeter : array_like
List of coordinates on the perimeter of the first object
pt2_perimeter : float
List of coordinates on the perimeter of the second object
Returns
-------
solution : array_like
Affine matrix used to describe the transformation between the two sets of points
"""
coefficient = np.empty((len(pt1_perimeter) * 2, 6))
ordinate = np.empty((len(pt1_perimeter) * 2))
for i in range(len(pt1_perimeter)):
coefficient[2 * i] = [pt1_perimeter[i, 0], pt1_perimeter[i, 1], 1, 0, 0, 0]
ordinate[2 * i] = pt2_perimeter[i, 0]
coefficient[2 * i + 1] = [0, 0, 0, pt1_perimeter[i, 0], pt1_perimeter[i, 1], 1]
ordinate[2 * i + 1] = pt2_perimeter[i, 1]
solution = np.linalg.lstsq(coefficient, ordinate)[0]
solution = np.reshape(solution, (2, 3))
solution = solution.astype(np.float32)
return solution
[docs]def calculate_contour(label_image):
"""Calculates the contour of the object
Parameters
----------
label_image : ndarray
Local image of each object
Returns
-------
contour : array_like
List of points describing the contour of the object
"""
contour, _ = cv2.findContours(np.array(label_image, dtype=np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
if len(contour) > 1:
lengths = np.array([x.shape[0] for x in contour])
index = np.argmax(lengths)
contour = contour[index].squeeze(axis=1)
else:
contour = contour[0].squeeze(axis=1)
return contour
[docs]@njit
def remove_items(properties, mask):
"""Removes objects from the list of properties based on a binary mask
Parameters
----------
properties : array_like
List of object properties
mask : array_like
Binary mask indicating which objects should be removed
Returns
-------
output : array_like
Shortened list of properties with objects removed based on mask
"""
output = List()
for i in range(len(properties)):
if mask[i] == 1:
output.append(properties[i])
return output