import numpy as np
import scipy.ndimage
from numba import jit, prange
[docs]def sliding_avg_subtract(im, window_size) -> np.ndarray:
r"""Perform sliding window average subtraction.
Parameters
----------
im : np.ndarray
Image of size :math:`N \times M`.
window_size : int
Window size.
Returns
-------
np.ndarray
Average subtracted image of size :math:`N \times M`.
"""
im_avg = scipy.ndimage.gaussian_filter(im, sigma=window_size // 2 + 1)
return im - im_avg
[docs]def construct_subpixel_position_map(im):
r"""Construct sub-pixel position map based on 3-point Gaussian fit stencil.
Parameters
----------
im : np.ndarray
Image of size :math:`N \times M`.
Returns
-------
X_sub, Y_sub : np.ndarray
Subpixel position map of size :math:`N \times M`.
"""
eps = np.finfo(np.float64).eps
# Log of image
im = np.log(im)
# Shifted images
im_E = np.zeros_like(im) + eps
im_W = np.zeros_like(im) + eps
im_S = np.zeros_like(im) + eps
im_N = np.zeros_like(im) + eps
im_E[:, :-1] = im[:, 1:]
im_W[:, 1:] = im[:, :-1]
im_N[1:, :] = im[:-1, :]
im_S[:-1, :] = im[1:, :]
# Subpixel position
X_sub = (im_W - im_E) / 2.0 / (im_E + im_W - 2 * im)
Y_sub = (im_S - im_N) / 2.0 / (im_S + im_N - 2 * im)
X_sub[~np.isfinite(X_sub)] = 0
Y_sub[~np.isfinite(Y_sub)] = 0
return X_sub, Y_sub
[docs]@jit(nopython=True, cache=True)
def find_particle(im, ic, jc, radius=1):
r"""Particle peak position finder around the radius of centroid.
Parameters
----------
im : np.ndarray
Image array of size :math:`N \times M`.
ic, jc : int
Row and column index of centroid.
radius : int
Search radius of centroid.
Returns
-------
int, int
Row index and column index of peak.
"""
n, m = im.shape
r, iPeak, jPeak, imax = 0, 0, 0, 0
while r <= radius and imax == 0:
for i in range(max(1, ic - r), min(ic + r, n - 2)):
for j in range(max(1, jc - r), min(jc + r, m - 2)):
if (
(im[i, j] > im[i, j - 1])
& (im[i, j] > im[i - 1, j])
& (im[i, j] > im[i, j + 1])
& (im[i, j] > im[i + 1, j])
& (im[i, j] > imax)
):
iPeak = i
jPeak = j
imax = im[i, j]
r += 1
return iPeak, jPeak
[docs]def find_peaks(imgPI) -> np.ndarray:
r"""Particle peak position detection according to Eq. (1) [1]_.
Parameters
----------
imgPI : np.ndarray
Image intensity product :math:`\Pi` of size :math:`N \times M`.
Returns
-------
np.ndarray
Peak map :math:`\varphi` of size :math:`N \times M`.
"""
# Calculate peaks values
imgPI_C = imgPI[1:-1, 1:-1]
# Neighbors
imgPI_E = imgPI[1:-1, 2:]
imgPI_W = imgPI[1:-1, :-2]
imgPI_N = imgPI[:-2, 1:-1]
imgPI_S = imgPI[2:, 1:-1]
# Locate peaks
peaks = np.zeros_like(imgPI)
peaks[1:-1, 1:-1] = (imgPI_C > imgPI_E) & (imgPI_C > imgPI_W) & (imgPI_C > imgPI_N) & (imgPI_C > imgPI_S)
# Threshold background
peaks *= np.sqrt(np.abs(imgPI))
return peaks
[docs]@jit(nopython=True, parallel=True, cache=True)
def disparity_ensemble_statistics(D, c, weights, wr, grid_size, coeff, ROI):
r"""Numba accelerated loop for computing the disparity statistics inside a window of radius `wr`.
Parameters
----------
D : np.ndarray
Disparity map :math:`D` of size :math:`2 \times N \times M` defined by Eq. (2) [1]_.
c : np.ndarray
Disparity weight map :math:`c` of size :math:`N \times M` defined by Eq. (3) [1]_.
weights : np.ndarray
Windowing weights of size :math:`N \times M` defined by Gaussian or tophat filter.
wr : int
Window radius.
ws : int
Disparity resolution size.
coeff : float
Confidence interval coefficient.
ROI : tuple
Row and column indices of the ROI: (`i_min`, `i_max`, `j_min`, `j_max`).
Returns
-------
delta : np.ndarray
Instantaneous error estimation map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
N : np.ndarray
Number of peaks inside the window.
mu : np.ndarray
Mean disparity map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
sigma : np.ndarray
Standard deviation disparity map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
References
----------
.. [1] Sciacchitano, A., Wieneke, B., & Scarano, F. (2013). PIV uncertainty quantification by image matching.
Measurement Science and Technology, 24 (4). https://doi.org/10.1088/0957-0233/24/4/045302.
"""
n, m = D.shape[1] // grid_size, D.shape[2] // grid_size
wr_eff = int(np.round(coeff * wr))
# Uncertainty statistics
N = np.zeros((n, m))
mu = np.zeros((2, n, m))
sigma = np.zeros((2, n, m))
delta = np.zeros((2, n, m))
for ii in prange(n):
i = ii * grid_size + grid_size // 2
# Row bounds of windw
i0 = max(i - wr_eff, 0)
i1 = min(i + wr_eff, n * grid_size - 1)
for jj in prange(m):
j = jj * grid_size + grid_size // 2
# Only calculate inside ROI
if (i >= ROI[0]) and (i <= ROI[1]) and (j >= ROI[2]) and (j <= ROI[3]):
# Column bounds of windw
j0 = max(j - wr_eff, 0)
j1 = min(j + wr_eff, m * grid_size - 1)
# Filter windowed
weights_w = weights[
wr_eff - (i - i0) : wr_eff + (i1 - i),
wr_eff - (j - j0) : wr_eff + (j1 - j),
]
# Peaks windowed
c_w = c[i0:i1, j0:j1] * weights_w
# Number of peaks inside window # Bug in original code?
N[ii, jj] = np.maximum(np.sum((c_w > 0) * weights_w), 1)
for k in range(2):
# Disparity windowed
d_w = D[k, i0:i1, j0:j1].ravel()
c_w = (c[i0:i1, j0:j1] * weights_w).ravel()
d_w = d_w[np.nonzero(d_w)]
c_w = c_w[np.nonzero(c_w)]
# Outlier removal
valid_mask = np.where(np.abs(d_w - np.mean(d_w)) <= 3 * np.std(d_w))
d_w = d_w[valid_mask]
c_w = c_w[valid_mask]
# Mean disparity (bias): Eq. (3) (left)
mu[k, ii, jj] = np.sum(c_w * d_w) / np.sum(c_w)
# Std. dev. disparity (rms): Eq. (3) (right)
sigma[k, ii, jj] = np.sqrt(np.sum(c_w * (d_w - mu[k, ii, jj]) ** 2) / np.sum(c_w))
# Instantanous error estimation
delta[k, ii, jj] = np.sqrt(mu[k, ii, jj] ** 2 + (sigma[k, ii, jj] ** 2 / N[ii, jj]))
return delta, N, mu, sigma
[docs]def disparity_vector_computation(warped_image_pair, radius=2.0, sliding_window_size=16):
r"""Python implementation of `Sciacchitano-Wieneke-Scarano` disparity vector computation algorithm for PIV
Uncertainty Quantification by image matching [1]_.
Parameters
----------
warped_image_pair : np.ndarray
Warped image pair :math:`\hat{\mathbf{I}} = (\hat{I}_0, \hat{I}_1)^{\top}` of size :math:`2 \times N \times M`.
radius : int, default: 2
Discrete particle position search radius from the centroid defined by :math:`\varphi`.
sliding_window_size : int, default: 16
Sliding window average subtraction window size.
Returns
-------
D : np.ndarray
Disparity map :math:`D` of size :math:`2 \times N \times M` defined by Eq. (2).
c : np.ndarray
Disparity weight map :math:`c` of size :math:`N \times M` defined by Eq. (3).
"""
frame_a, frame_b = warped_image_pair
# Ensure images are float
frame_a = frame_a.astype("float")
frame_b = frame_b.astype("float")
if sliding_window_size:
frame_a = sliding_avg_subtract(frame_a, sliding_window_size)
frame_b = sliding_avg_subtract(frame_b, sliding_window_size)
# Image intensity product (Eq. 1): :math:`\Pi = \hat{I}_1\hat{I}_2`
imgPI = frame_a * frame_b
# Positive regions
img_pos = (frame_a > 0) & (frame_b > 0)
# Ensure frames are positive
frame_a = frame_a - np.min(frame_a) + np.finfo(np.float64).eps
frame_b = frame_b - np.min(frame_b) + np.finfo(np.float64).eps
# Find peaks
peaks = find_peaks(imgPI)
# Calculate indices of peaks
coords = np.stack(np.where(peaks))
# # Construct coordinates
Xo, Yo = np.meshgrid(
np.arange(imgPI.shape[1], dtype="float"),
np.arange(imgPI.shape[0], dtype="float"),
)
# Construct coordinates
X = np.tile(Xo, (2, 1, 1))
Y = np.tile(Yo, (2, 1, 1))
X_A_sub, Y_A_sub = construct_subpixel_position_map(frame_a)
X_B_sub, Y_B_sub = construct_subpixel_position_map(frame_b)
for i, j in coords.T:
iAp, jAp = find_particle(frame_a, i, j, radius=radius)
iBp, jBp = find_particle(frame_b, i, j, radius=radius)
if (iAp * jAp * iBp * jBp) > 0:
X[0, i, j] = Xo[iAp, jAp] + X_A_sub[iAp, jAp]
Y[0, i, j] = Yo[iAp, jAp] - Y_A_sub[iAp, jAp]
X[1, i, j] = Xo[iBp, jBp] + X_B_sub[iBp, jBp]
Y[1, i, j] = Yo[iBp, jBp] - Y_B_sub[iBp, jBp]
else:
peaks[i, j] = 0
# (Eq. 3): disparity weights $c$
c = peaks * (imgPI > 0) * img_pos
# Disparity map: $D$
D = np.stack((X[1] - X[0], Y[1] - Y[0])) * (imgPI > 0) * img_pos
return D, c