import os
import pathlib
import warnings
from pathlib import Path
from datetime import UTC, datetime
import numpy as np
from astropy.io import fits
from astropy.wcs import WCS
from ndcube import NDCube
from reproject import reproject_adaptive
from scipy.ndimage import binary_dilation, binary_erosion, grey_closing
from punchbowl.data import load_ndcube_from_fits
from punchbowl.data.meta import NormalizedMetadata
from punchbowl.data.punch_io import get_base_file_name
from punchbowl.exceptions import (
IncorrectPolarizationStateWarning,
IncorrectTelescopeWarning,
InvalidDataError,
LargeTimeDeltaWarning,
NoCalibrationDataWarning,
)
from punchbowl.level1.sqrt import decode_sqrt_data
from punchbowl.prefect import punch_task
from punchbowl.util import DataLoader, interpolate_data, load_mask_file
[docs]
def _load_vignetting_function(vignetting_path: str | pathlib.Path | DataLoader | None) -> NDCube:
if isinstance(vignetting_path, DataLoader):
vignetting_function = vignetting_path.load()
vignetting_path = vignetting_path.src_repr()
else:
if isinstance(vignetting_path, str):
vignetting_path = pathlib.Path(vignetting_path)
if not vignetting_path.exists():
msg = f"File {vignetting_path} does not exist."
raise InvalidDataError(msg)
vignetting_function = load_ndcube_from_fits(vignetting_path, include_provenance=False)
return vignetting_function, vignetting_path
[docs]
@punch_task
def correct_vignetting_task(data_object: NDCube, # noqa: C901
vignetting_path: str | pathlib.Path | DataLoader | None,
second_vignetting_path: str | pathlib.Path | DataLoader | None = None,
allow_extrapolation: bool = False) -> NDCube:
"""
Prefect task to correct the vignetting of an image.
Vignetting is a reduction of an image's brightness or saturation toward the
periphery compared to the image center, created by the optical path. The
Vignetting Module will transform the data through a flat-field correction
map, to cancel out the effects of optical vignetting created by distortions
in the optical path. This module also corrects detector gain variation and
offset.
Correction maps will be 2048*2048 arrays, to match the input data, and
built using the starfield brightness pattern. Mathematical Operation:
I'_{i,j} = I_i,j / FF_{i,j}
Where I_{i,j} is the number of counts in pixel i, j. I'_{i,j} refers to the
modified value. FF_{i,j} is the small-scale flat field factor for pixel i,
j. The correction mapping will take into account the orientation of the
spacecraft and its position in the orbit.
Uncertainty across the image plane is calculated using the modelled
flat-field correction with stim lamp calibration data. Deviations from the
known flat-field are used to calculate the uncertainty in a given pixel.
The uncertainty is convolved with the input uncertainty layer to produce
the output uncertainty layer.
Parameters
----------
data_object : PUNCHData
data on which to operate
vignetting_path : str | Path | DataLoader
path to vignetting function to apply to input data
second_vignetting_path : str | Path | DataLoader | None
if provided, the two vignetting functions will be interpolated between
allow_extrapolation : bool
if second_vignetting_path is provided, whether to allow extrapolation beyond the two vignetting files. If False
and data_object's date_obs is outside this range, the interpolation will be "clamped" to one of the two files,
rather than extrapolating.
Returns
-------
PUNCHData
modified version of the input with the vignetting corrected
"""
if vignetting_path is None:
data_object.meta.history.add_now("LEVEL1-correct_vignetting", "Vignetting skipped")
msg=f"Calibration file {vignetting_path} is unavailable, vignetting correction not applied"
warnings.warn(msg, NoCalibrationDataWarning)
else:
vignetting_function, vignetting_path = _load_vignetting_function(vignetting_path)
if second_vignetting_path is None:
second_vignetting_function = None
else:
second_vignetting_function, second_vignetting_path = _load_vignetting_function(second_vignetting_path)
for function, path in ([vignetting_function, vignetting_path],
[second_vignetting_function, second_vignetting_path]):
if function is None:
continue
vignetting_function_date = function.meta.astropy_time
observation_date = data_object.meta.astropy_time
if abs((vignetting_function_date - observation_date).to("day").value) > 14:
msg = f"Calibration file {path} contains data created greater than 2 weeks from the observation"
warnings.warn(msg, LargeTimeDeltaWarning)
if function.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value:
msg = f"Incorrect TELESCOP value within {path}"
warnings.warn(msg, IncorrectTelescopeWarning)
if function.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value:
msg = f"Incorrect polarization state within {path}"
warnings.warn(msg, IncorrectPolarizationStateWarning)
if function.data.shape != data_object.data.shape:
msg = f"Incorrect vignetting function shape within {path}"
raise InvalidDataError(msg)
history_message = f"Vignetting corrected using {os.path.basename(str(vignetting_path))}"
if second_vignetting_function is not None:
if second_vignetting_function.meta.astropy_time < vignetting_function.meta.astropy_time:
vignetting_function, second_vignetting_function = second_vignetting_function, vignetting_function
if not allow_extrapolation and data_object.meta.astropy_time < vignetting_function.meta.astropy_time:
msg = "Data is before first vignetting function and extrapolation is not allowed; clamping."
warnings.warn(msg)
final_vignetting = vignetting_function.data
elif (not allow_extrapolation
and data_object.meta.astropy_time > second_vignetting_function.meta.astropy_time):
msg = "Data is after second vignetting function and extrapolation is not allowed; clamping."
warnings.warn(msg)
final_vignetting = second_vignetting_function.data
history_message = f"Vignetting corrected using {os.path.basename(str(second_vignetting_path))}"
else:
final_vignetting = interpolate_data(vignetting_function, second_vignetting_function,
data_object.meta.datetime,
allow_extrapolation=allow_extrapolation)
history_message += f" and {os.path.basename(str(second_vignetting_path))}"
else:
final_vignetting = vignetting_function.data
data_object.data[:, :] /= final_vignetting[:, :]
data_object.uncertainty.array[:, :] /= final_vignetting[:, :]
data_object.meta.history.add_now("LEVEL1-correct_vignetting", history_message)
return data_object
[docs]
def generate_vignetting_calibration_wfi(path_vignetting: str,
path_mask: str,
spacecraft: str,
vignetting_threshold: float = 1.2,
rows_ignore: tuple = (13,15),
rows_adjust: tuple = (15,16),
rows_adjust_source: tuple = (16,20),
mask_erosion: tuple = (6,6)) -> np.ndarray:
"""
Create calibration data for vignetting.
Parameters
----------
path_vignetting : str
path to raw input vignetting function
path_mask : str
path to spacecraft mask function
spacecraft : str
spacecraft number
vignetting_threshold : float, optional
threshold for bad vignetting pixels, by default 1.2
rows_ignore : tuple, optional
rows to exclude entirely from original vignetting data, by default (13,15) for 128x128 input
rows_adjust : tuple, optional
rows to adjust to the minimum of a set of rows above (per column), by default (15,16) for 128x128 input
rows_adjust_source : tuple, optional
rows to use for statistics to adjust vignetting rows as above, by default (16,20) for 128x128 input
mask_erosion: tuple, optional
kernel to use in erosion operation to reduce the mask applied to the vignetting function, by default (6,6)
Returns
-------
np.ndarray
vignetting function array
"""
if spacecraft in ["1", "2", "3"]:
if not os.path.exists(path_vignetting):
return np.ones((2048,2048))
with open(path_vignetting) as f:
lines = f.readlines()
with open(path_mask, "rb") as f:
byte_array = f.read()
mask = np.unpackbits(np.frombuffer(byte_array, dtype=np.uint8)).reshape(2048,2048)
mask = mask.T
num_bins, bin_size = lines[0].split()
num_bins = int(num_bins)
bin_size = int(bin_size)
values = np.array([float(v) for line in lines[1:] for v in line.split()])
vignetting = values[:num_bins**2].reshape((num_bins, num_bins))
vignetting[vignetting > vignetting_threshold] = np.nan
vignetting[rows_ignore[0]:rows_ignore[1],:] = np.nan
vignetting[rows_adjust[0]:rows_adjust[1],:] = np.min(vignetting[rows_adjust_source[0]:rows_adjust_source[1],:],
axis=0)
wcs_vignetting = WCS(naxis=2)
wcs_wfi = WCS(naxis=2)
wcs_wfi.wcs.cdelt = wcs_wfi.wcs.cdelt * vignetting.shape[0] / 2048.
vignetting_reprojected = reproject_adaptive((vignetting, wcs_vignetting),
shape_out=(2048,2048),
output_projection=wcs_wfi,
boundary_mode="ignore",
bad_value_mode="ignore",
return_footprint=False)
mask = binary_erosion(mask, structure=np.ones(mask_erosion))
vignetting_reprojected = vignetting_reprojected * mask
vignetting_reprojected[mask == 0] = 1
return vignetting_reprojected
if spacecraft=="4":
raise RuntimeError("Please use the NFI vignetting generator function.")
raise RuntimeError(f"Unknown spacecraft {spacecraft}")
[docs]
def generate_vignetting_calibration_nfi(input_files: list[str], # noqa: C901
path_speckle: str,
path_mask: str,
path_dark: str,
polarizer: str,
dateobs: str,
version: str,
output_path: str | None = None,
max_files: int = -1) -> np.ndarray | str:
"""
Create calibration data for vignetting for the NFI spacecraft.
Parameters
----------
input_files : list[str]
Paths to input NFI files for processing
path_speckle : str
Path to the speckle mask FITS file
path_mask : str
Path to the NFI mask bin file
path_dark : str
Path to the dark frame FITS file
polarizer : str
Polarizer name
dateobs : str
Timestamp for calibration file
version : str
File version
output_path : str | None
Path to calibration file output
max_files : int
If set, only the first this many non-outlier files will be loaded and used
Returns
-------
np.ndarray | str
vignetting function array or written file path
"""
if input_files is None:
return np.ones((2048,2048))
# Load speckle mask and dark frame
with fits.open(path_speckle) as hdul:
specklemask = np.fliplr(hdul[0].data)
with fits.open(path_dark) as hdul:
nfidark = hdul[1].data
# Load and square root decode input data
cubes = []
for file in input_files:
try:
cube = load_ndcube_from_fits(file)
except: # noqa: E722
print(f"Error loading {file}") # noqa: T201
continue
if cube.meta["OFFSET"].value is None:
cube.meta["OFFSET"] = 400
# Reject outlier images in vignetting calculation if they have been flagged
if "OUTLIER" in cube.meta:
if cube.meta["OUTLIER"].value == 0:
cubes.append(decode_sqrt_data.fn(cube))
# If outlier rejection was not applied, manually reject outliers using these human-derived checks
else:
if 490 <= cube.meta["DATAMDN"].value <= 655 and cube.meta["DATAP99"].value != 4095:
cubes.append(decode_sqrt_data.fn(cube))
if max_files > 0 and len(cubes) >= max_files:
break
# Subtract dark frame
for cube in cubes:
cube.data[...] -= nfidark
# Build speckle boundary mask
inverted_mask = 1 - specklemask
dilated = binary_dilation(inverted_mask, structure=np.ones((3, 3)))
boundary_mask = dilated & (~inverted_mask)
# Stack image data
images = np.array([cube.data for cube in cubes])
# If only one file exists in the datacube, do not create the vignetting correction
if len(images.shape) != 3:
return None
applied_images = images * boundary_mask
applied_speck = images * specklemask
# Compute averages and construct flatfield
avg_images = np.nanmean(applied_images, axis=0)
avg_img_darkremoved = np.nanmean(images, axis=0)
avg_speck = np.nanmean(applied_speck, axis=0)
avg_speckfilled = grey_closing(avg_images, structure=np.ones((7, 7)))
nficlean = avg_speckfilled * inverted_mask + avg_speck
nfiflat = avg_img_darkremoved / nficlean
# Load spacecraft mask
mask_nfi = load_mask_file(path_mask)
# Deal with infs and remask
nfiflat[np.isinf(nfiflat)] = 1.
nfiflat[mask_nfi == 0] = 1
# Generate an output metadata and NDCube
m = NormalizedMetadata.load_template(f"G{polarizer}4", "1")
m["DATE-OBS"] = dateobs
m["FILEVRSN"] = version
m["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
cube = NDCube(data=nfiflat.astype("float32"), wcs=cube.wcs, meta=m)
if output_path is not None:
filename = Path(output_path) / f"{get_base_file_name(cube)}.fits"
full_header = cube.meta.to_fits_header(wcs=cube.wcs)
full_header["FILENAME"] = os.path.basename(filename)
hdu_data = fits.ImageHDU(data=cube.data,
header=full_header,
name="Primary data array")
hdu_provenance = fits.BinTableHDU.from_columns(fits.ColDefs([fits.Column(
name="provenance", format="A40", array=np.char.array(cube.meta.provenance))]))
hdu_provenance.name = "File provenance"
hdul = cube.wcs.to_fits()
hdul[0] = fits.PrimaryHDU()
hdul.insert(1, hdu_data)
hdul.append(hdu_provenance)
hdul.writeto(filename, overwrite=True, checksum=True)
hdul.close()
return filename
return cube.data