import os
import numpy as np
from ndcube import NDCube
from numpy.lib.stride_tricks import as_strided
from punchbowl.data import load_ndcube_from_fits
from punchbowl.prefect import punch_task
[docs]
def sliding_window(arr: np.ndarray, window_size: int) -> np.ndarray:
"""
Construct a sliding window view of the array.
borrowed from: https://stackoverflow.com/questions/10996769/pixel-neighbors-in-2d-array-image-using-python.
"""
arr = np.asarray(arr)
window_size = int(window_size)
if arr.ndim != 2:
msg = "need 2-D input"
raise ValueError(msg)
if not (window_size > 0):
msg = "need a positive window size"
raise ValueError(msg)
shape = (arr.shape[0] - window_size + 1, arr.shape[1] - window_size + 1, window_size, window_size)
if shape[0] <= 0:
shape = (1, shape[1], arr.shape[0], shape[3])
if shape[1] <= 0:
shape = (shape[0], 1, shape[2], arr.shape[1])
strides = (arr.shape[1] * arr.itemsize, arr.itemsize, arr.shape[1] * arr.itemsize, arr.itemsize)
return as_strided(arr, shape=shape, strides=strides)
[docs]
def cell_neighbors(arr: np.ndarray, i: int, j: int, window_size: int = 1) -> np.ndarray:
"""
Return d-th neighbors of cell (i, j).
borrowed from: https://stackoverflow.com/questions/10996769/pixel-neighbors-in-2d-array-image-using-python.
"""
window = sliding_window(arr, 2 * window_size + 1)
ix = np.clip(i - window_size, 0, window.shape[0] - 1)
jx = np.clip(j - window_size, 0, window.shape[1] - 1)
i0 = max(0, i - window_size - ix)
j0 = max(0, j - window_size - jx)
i1 = window.shape[2] - max(0, window_size - i + ix)
j1 = window.shape[3] - max(0, window_size - j + jx)
return window[ix, jx][i0:i1, j0:j1].ravel()
[docs]
def mean_correct(
data_array: np.ndarray, mask_array: np.ndarray, required_good_count: int = 3, max_window_size: int = 10,
) -> np.ndarray:
"""Mean correct."""
x_bad_pix, y_bad_pix = np.where(mask_array == 0)
data_array[mask_array == 0] = 0
output_data_array = data_array.copy()
for x_i, y_i in zip(x_bad_pix, y_bad_pix, strict=False):
window_size = 1
number_good_px = np.sum(cell_neighbors(mask_array, x_i, y_i, window_size=window_size))
while number_good_px < required_good_count:
window_size += 1
number_good_px = np.sum(cell_neighbors(mask_array, x_i, y_i, window_size=window_size))
if window_size > max_window_size:
break
output_data_array[x_i, y_i] = (
np.sum(cell_neighbors(data_array, x_i, y_i, window_size=window_size)) / number_good_px
)
return output_data_array
[docs]
def remove_deficient_pixels(data: NDCube,
deficient_pixels: np.ndarray,
required_good_count: int = 3,
max_window_size: int = 10,
method: str = "median") -> NDCube:
"""Remove deficient pixels."""
# check dimensions match
if data.data.shape != deficient_pixels.shape:
msg = (
"deficient_pixel_array expects the data_object and"
"deficient_pixel_array arrays to have the same dimensions."
f"data_array dims: {data.data.shape}"
f"and deficient_pixel_map dims: {deficient_pixels.shape}"
)
raise ValueError(
msg,
)
if np.all(deficient_pixels == 0):
msg = "All pixels in mask are deficient and cannot be corrected."
raise ValueError(msg)
if method == "median":
data_array = median_correct(
data.data, deficient_pixels, required_good_count=required_good_count, max_window_size=max_window_size,
)
elif method == "mean":
data_array = mean_correct(
data.data, deficient_pixels, required_good_count=required_good_count, max_window_size=max_window_size,
)
else:
msg = f"method specified must be 'mean', or 'median'. Found method={method}"
raise ValueError(msg)
# Set deficient pixels to complete uncertainty
output_uncertainty = data.uncertainty.array.copy()
output_uncertainty[deficient_pixels == 0] = np.inf
data.data[...] = data_array[...]
data.uncertainty.array = output_uncertainty
return data
[docs]
@punch_task
def remove_deficient_pixels_task(
data: NDCube,
deficient_pixel_map_path: str | None,
required_good_count: int = 3,
max_window_size: int = 10,
method: str = "median",
) -> NDCube:
"""
Subtracts a deficient pixel map from an input data frame.
checks the dimensions of input data frame and map match and
subtracts the background model from the data frame of interest.
Parameters
----------
data : NDCube
A PUNCHobject data frame to be background subtracted
deficient_pixel_map_path : Optional[str]
The path to the deficient pixel map use to in correction
required_good_count : int
how many neighboring pixels must not be deficient to correct a pixel,
if fewer than that many pixels are good neighbors then the box expands
max_window_size : int
the width of the max window
method : str
either "mean" or "median" depending on which measure should fill the deficient pixel
Returns
-------
NDCube
A background subtracted data frame
"""
if deficient_pixel_map_path is None:
output_object = data
output_object.meta.history.add_now("LEVEL1-remove_deficient_pixels",
"Remove deficient pixels skipped since path is empty")
else:
deficient_pixel_map = load_ndcube_from_fits(deficient_pixel_map_path)
deficient_pixel_array = deficient_pixel_map.data
output_object = remove_deficient_pixels(data, deficient_pixel_array,
required_good_count=required_good_count,
max_window_size=max_window_size,
method=method)
output_object.meta["CALPM"] = os.path.basename(deficient_pixel_map_path)
output_object.meta.history.add_now("LEVEL1-remove_deficient_pixels", "deficient pixels removed")
return output_object
[docs]
def create_all_valid_deficient_pixel_map(data: NDCube) -> np.ndarray:
"""
Create valid deficient pixel map.
Parameters
----------
data: NDCube
NDCube being modified
Returns
-------
np.ndarray
Array with all data points set to 1, representing the valid deficient pixel map.
"""
return np.ones(data.data.shape, dtype=bool)