import numpy as np
from ndcube import NDCube
from prefect import get_run_logger
from scipy.ndimage import gaussian_filter
from skimage.morphology import binary_dilation
from punchbowl.data import load_ndcube_from_fits
from punchbowl.prefect import punch_task
[docs]
def run_zspike(
data: np.ndarray,
uncertainty: np.ndarray,
threshold: float = 4,
required_yes: int = 6,
veto_limit: int = 2,
diff_method: str = "sigma",
dilation: int = 0,
blur_scale: float = 1.0,
blur_threshold: float | None = 0.15,
index_of_interest: int | None = None,
) -> np.ndarray:
"""
Identify bright structures in temporal series of images.
Given a time sequence of images, identify "spikes" that exceed a
threshold in a single frame.
Diffuse bright structures over the background F-corona are
identified and marked in the data using in-band "bad value" marking
(which is supported by the FITS standard). Data marking follows the
existing ZSPIKE temporal despiking algorithm to identify auroral
transients.
This is a voting algorithm based on ZSPIKE.PRO from the 1990s,
which is available in the solarsoft distribution from Lockheed Martin.
ZSPIKE was originally used to identify cosmic rays, and
was adopted on STEREO for on-board despiking during exposure accumulation.
For this application ZSPIKE is ideal because it does not rely on the
spatial structure of spikes, only their temporal structure. Both cosmic
rays and, if present, high-altitude aurora are transient and easily
detected with ZSPIKE.
The algorithm assembles "votes" from the images
surrounding each one in a stream, to determine whether a particular pixel
is a good candidate for a temporal spike. If the pixel is sufficiently
bright compared to its neighbors in time, it is marked bad. "Bad values"
are stored in the DRP for file quality marking outlined in Module Quality
Marking.
There are two methods of identifying if a pixel is above a given threshold,
and therefore considered a spike. diff_method='abs' or 'sigma'.
diff_method 'abs' represents the absolute difference, and is the default.
If set, this is an absolute difference, in DN, required for a pixel to
'vote' its central value. If the central value is this much higher than a
given voting value, then the central value is voted to be a spike. If
it's this much lower, the veto count is incremented.
If diff_method 'sigma' is set if, then each pixel is treated as a
time series and the calculated sigma (RMS variation from the mean) of
the timeseries is used to calculate a difference threshold at each
location.
The threshold is the value over which a pixel is voted as a spike. The threshold
should be different depending diff_method.
Parameters
----------
data : np.ndarray
data to operate on - this is expected to have an odd number of frames, or
a frame_of_interest should be inserted.
uncertainty: np.ndarray
data to operate on - this is expected to have an odd number of frames, or
a frame_of_interest should be inserted.
threshold : float
This is the threshold over which a pixel is voted as a spike.
required_yes: int
(default is 4) - number of 'voting' frames that must vote the
value is a spike, for it to be marked as such.
veto_limit: int
(default is 2) - number of 'voting' frames that must vote NO to veto
marking the central value as a spike.
diff_method: str
This is the method by which the threshold is set. 'abs' treats each pixel
independently and finds the absolute difference, and 'sigma' treats each
corresponding pixel as a time series and the calculated RMS variation
from the mean of the timeseries is used to calculate a difference at each
location.
dilation: int
If nonzero, this is the number of times to morphologically dilate pixels marked as bright structures.
blur_scale: float
The scale of the gaussian kernel used for convolving with the mask for blurring.
blur_threshold: float
The threshold value used to cast a blurred mask array back to a boolean data type.
index_of_interest : int
if you have an even number of frames, or you don't want to find spikes
in the center frame, the frame_of_interest will be used. index_of_interest
starts from 0, therefore the center frame from 21 frames will be 10.
Returns
-------
np.ndarray
a frame matching the dimensions of the frame of interest with True flagging
bad pixels, and False flagging good data points
"""
if len(data.shape) != 3:
msg = "`data` must be a 3-D array"
raise ValueError(msg)
# test if odd number of frames, or if a frame of interest has been included
z_shape = np.shape(data[:, 0, 0])
if z_shape[0] % 2 == 0:
if index_of_interest is None:
msg = "Number of frames in `data` must be odd or have `frame_of_interest` set."
raise ValueError(msg)
elif index_of_interest is None:
index_of_interest = z_shape[0] // 2
frame_of_interest = data[index_of_interest, :, :]
frame_of_interest_uncertainty = uncertainty[index_of_interest, :, :]
voters_array = np.delete(data, index_of_interest, 0)
voters_array_uncertainty = np.delete(uncertainty, index_of_interest, 0)
# create a reference cube the same dimensions as the voter cube, filled with frame of interest
reference_cube = np.stack([frame_of_interest for _ in range(voters_array.shape[0])], axis=0)
difference_array = np.abs(reference_cube - voters_array)
if diff_method == "abs":
threshold_array = np.full(voters_array.shape, threshold)
elif diff_method == "sigma":
threshold_array = threshold * np.nanstd(voters_array, axis=0, where=voters_array_uncertainty < 1.0)
else:
msg = f"A `diff_method` of `sigma` or `abs` is expected. Found diff_method={diff_method}."
raise ValueError(msg)
yes_vote_count = np.sum(difference_array > threshold_array, axis=0)
no_vote_count = np.full(frame_of_interest.shape, voters_array.shape[0]) - yes_vote_count
flagged_features_array = yes_vote_count > required_yes
# if the number of no votes exceeds a veto limit, then veto
veto_mask = no_vote_count > veto_limit
flagged_features_array[veto_mask] = False
# if input data already has an unbounded uncertainty, create a True flag
flagged_features_array = np.where(np.isinf(frame_of_interest_uncertainty), True, flagged_features_array)
# expand flags by dilating to remove holes or fuzzy edges
for _ in range(dilation):
binary_dilation(flagged_features_array, out=flagged_features_array)
# blur and threshold the mask
if blur_scale is not None:
flagged_features_array = gaussian_filter(flagged_features_array.astype(float),
sigma=blur_scale) > blur_threshold
return flagged_features_array
[docs]
@punch_task
def identify_bright_structures_task(
data: NDCube | None,
voter_filenames: list[str],
threshold: float = 4,
required_yes: int = 6,
veto_limit: int = 2,
diff_method: str = "sigma",
dilation: int = 0,
) -> NDCube | None:
"""Prefect task to perform bright structure identification."""
logger = get_run_logger()
logger.info("identify_bright_structures_task started")
if data is None:
logger.info("Identify bright structures skipped since data is None")
return data
if len(voter_filenames) == 0:
logger.info("Identify bright structures skipped since no voters provided")
data.meta.history.add_now("LEVEL2-bright_structure",
"Identify bright structures skipped since no voters provided")
return data
# construct voter cube
voters, voters_uncertainty = [], []
for voter_filename in voter_filenames:
this_punchdata = load_ndcube_from_fits(voter_filename)
voters.append(this_punchdata.data)
voters_uncertainty.append(this_punchdata.uncertainty)
# add frame of interest
voters.append(data.data)
voters_uncertainty.append(data.uncertainty)
# apply find spikes
spike_mask = run_zspike(
data=np.array(voters),
uncertainty=np.array(voters_uncertainty),
threshold=threshold,
required_yes=required_yes,
veto_limit=veto_limit,
diff_method=diff_method,
dilation=dilation,
index_of_interest=-1)
# add the uncertainty to the output punch data object
data.uncertainty.array[spike_mask] = np.inf
logger.info("identify_bright_structures_task ended")
data.meta.history.add_now("LEVEL2-bright_structures",
"bright structure identification completed")
return data