import numpy as np
import scipy.signal
import skimage.registration
from . import lib, warping
[docs]def ilk(
image_pair,
U,
window_size=16,
prefilter=True,
window="gaussian",
velocity_upsample_kind="linear",
warp_direction="center",
warp_order=1,
warp_nsteps=1,
):
r"""Disparity map calculation using iterative Lucas Kanade ("ilk").
Parameters
----------
image_pair : np.ndarray
Image pairs :math:`\mathbf{I} = (I_0, I_1)^{\top}` of size (2 x rows x cols).
U : np.ndarray
Sparse or dense 2D velocity field :math:`\mathbf{U} = (u, v)^{\top}` of (2 x U_rows x U_cols).
window_size : int, default: 16
Window size around the pixel to consider the disparity for optical flow estimator.
window : {"gaussian", "tophat"}, default: "gaussian"
Windowing kernel type for integration around the pixel.
prefilter : bool, default: True
Whether to prefilter the estimated optical flow before each image warp. When True, a median filter with window
size 3 along each axis is applied. This helps to remove potential outliers.
velocity_upsample_kind : {"linear", "cubic", "quintic"}, default: "linear"
Velocity upsampling kind for spline interpolation `scipy.interpolation.interp2d`.
warp_direction : {"forward", "center", "backward"}, default: "center"
Warping direction.
warp_order : 1-5, default: 1
The order of interpolation for `skimage.transform.warp`.
warp_nsteps : int, default: 5
Number of sub-steps to use for warping to improve accuracy.
Returns
-------
X, Y : np.ndarray
`x` and `y` coordinates of disparity map.
D : np.ndarray
pixel-wise 2D disparity map :math:`\mathbf{D} = (d_x, d_y)^\top` of size (2 x rows x cols).
See Also
--------
skimage.registration.optical_flow_ilk : Coarse to fine optical flow estimator.
skimage.transform.warp : Warp an image according to a given coordinate transformation.
"""
# Image dimensions
nr, nc = image_pair[0].shape
# Warp image: $\hat{\mathbf{I}}$
warped_frame_a, warped_frame_b = warping.warp(
image_pair,
U,
velocity_upsample_kind=velocity_upsample_kind,
direction=warp_direction,
nsteps=warp_nsteps,
order=warp_order,
)
# windowing type
gaussian = True if window == "gaussian" else False
# Disparity map using Lucas Kanade
optical_flow = skimage.registration.optical_flow_ilk(
warped_frame_a,
warped_frame_b,
radius=window_size // 2,
gaussian=gaussian,
prefilter=prefilter,
)
# Disparity map
D = np.abs(optical_flow[::-1])
# Image coordinates
Y, X = np.meshgrid(np.arange(nr), np.arange(nc), indexing="ij")
return X, Y, D
[docs]def sws(
image_pair,
U,
window_size=16,
grid_size=4,
window="gaussian",
radius=1,
sliding_window_subtraction=True,
ROI=None,
velocity_upsample_kind="linear",
warp_direction="center",
warp_order=1,
warp_nsteps=1,
):
r"""Python implementation of `Sciacchitano-Wieneke-Scarano` algorithm of PIV Uncertainty Quantification by image
matching [1]_.
Parameters
----------
image_pair : np.ndarray
Image pairs :math:`\mathbf{I} = (I_0, I_1)^{\top}` of size (2 x rows x cols).
U : np.ndarray
Sparse or dense 2D velocity field :math:`\mathbf{U} = (u, v)^{\top}` of (2 x U_rows x U_cols).
window_size : int, default: 16
Window size around the pixel to consider the disparity ensemble.
grid_size: int, default: 4
Disparity ensemble grid resolution in pixels.
window : {"gaussian", "tophat"}, default: "gaussian"
Window type for the disparity statistics.
radius : int, default: 1
Search radius for particle peak position.
sliding_window_subtraction : bool, default: False
Whether to use the sliding window subtraction before disparity vector calculation.
ROI : tuple, default: None
Region of interest to use for calculating the disparity ensemble (`i_min`, `i_max`, `j_min`, `j_max`).
velocity_upsample_kind : {"linear", "cubic", "quintic"}, default: "linear"
Velocity upsampling kind for spline interpolation `scipy.interpolation.interp2d`.
warp_direction : {"forward", "center", "backward"}, default: "center"
Warping direction.
warp_order : 1-5, default: 1
The order of interpolation for `skimage.transform.warp`.
warp_nsteps : int, default: 5
Number of sub-steps to use for warping to improve accuracy.
Returns
-------
X, Y : np.ndarray
`x` and `y` coordinates of disparity map.
delta : np.ndarray
Instantaneous error estimation map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
N : np.ndarray
Number of peaks inside the window.
mu : np.ndarray
Mean disparity map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
sigma : np.ndarray
Standard deviation disparity map of size :math:`2 \times N \times M` defined by Eq. (3) [1]_.
References
----------
.. [1] Sciacchitano, A., Wieneke, B., & Scarano, F. (2013). PIV uncertainty quantification by image matching.
Measurement Science and Technology, 24 (4). https://doi.org/10.1088/0957-0233/24/4/045302
"""
# Image dimensions
nr, nc = image_pair[0].shape
# Step 1: Warp image: $\hat{\mathbf{I}}$
warped_image_pair = warping.warp(
image_pair,
U,
velocity_upsample_kind=velocity_upsample_kind,
direction=warp_direction,
nsteps=warp_nsteps,
order=warp_order,
)
# Step 2: Disparity vector computation
if sliding_window_subtraction:
sliding_window_size = window_size
else:
sliding_window_size = None
D, c = lib.disparity_vector_computation(
warped_image_pair,
radius=radius,
sliding_window_size=sliding_window_size,
)
# Step 3: Disparity statistics calculation
# Gaussian windowing
wr = int(np.round(window_size / 2))
if window == "gaussian":
coeff = 1.75
weights = scipy.signal.windows.gaussian(int(np.round(wr * 2 * coeff)) + 1, int(np.round(wr / 2 * coeff)))
elif window == "tophat":
coeff = 1
weights = np.ones(wr * 2 * coeff + 1)
else:
raise ValueError(f"Window type `{window}` not valid.")
weights = np.outer(weights, weights) # 2D windowing weights
if ROI is None:
ROI = (0, nr, 0, nc)
else:
ROI = tuple(ROI)
# Accumulate disparity statistics within the window (numba accelerated loop)
delta, N, mu, sigma = lib.disparity_ensemble_statistics(D, c, weights, wr, grid_size, coeff, ROI)
# Coordinates
Y, X = np.meshgrid(
np.arange(grid_size // 2, nr, grid_size), np.arange(grid_size // 2, nc, grid_size), indexing="ij"
)
return X, Y, delta, N, mu, sigma