Source code for punchbowl.level1.vignette

import os
import pathlib
import warnings
from collections.abc import Callable

import numpy as np
from astropy.wcs import WCS
from ndcube import NDCube
from reproject import reproject_adaptive

from punchbowl.data import load_ndcube_from_fits
from punchbowl.exceptions import (
    IncorrectPolarizationStateWarning,
    IncorrectTelescopeWarning,
    InvalidDataError,
    LargeTimeDeltaWarning,
    NoCalibrationDataWarning,
)
from punchbowl.prefect import punch_task


[docs] @punch_task def correct_vignetting_task(data_object: NDCube, vignetting_path: str | pathlib.Path | Callable | None) -> NDCube: """ Prefect task to correct the vignetting of an image. Vignetting is a reduction of an image's brightness or saturation toward the periphery compared to the image center, created by the optical path. The Vignetting Module will transform the data through a flat-field correction map, to cancel out the effects of optical vignetting created by distortions in the optical path. This module also corrects detector gain variation and offset. Correction maps will be 2048*2048 arrays, to match the input data, and built using the starfield brightness pattern. Mathematical Operation: I'_{i,j} = I_i,j / FF_{i,j} Where I_{i,j} is the number of counts in pixel i, j. I'_{i,j} refers to the modified value. FF_{i,j} is the small-scale flat field factor for pixel i, j. The correction mapping will take into account the orientation of the spacecraft and its position in the orbit. Uncertainty across the image plane is calculated using the modelled flat-field correction with stim lamp calibration data. Deviations from the known flat-field are used to calculate the uncertainty in a given pixel. The uncertainty is convolved with the input uncertainty layer to produce the output uncertainty layer. Parameters ---------- data_object : PUNCHData data on which to operate vignetting_path : pathlib path to vignetting function to apply to input data Returns ------- PUNCHData modified version of the input with the vignetting corrected """ if vignetting_path is None: data_object.meta.history.add_now("LEVEL1-correct_vignetting", "Vignetting skipped") msg=f"Calibration file {vignetting_path} is unavailable, vignetting correction not applied" warnings.warn(msg, NoCalibrationDataWarning) else: if isinstance(vignetting_path, Callable): vignetting_function, vignetting_path = vignetting_path() else: if isinstance(vignetting_path, str): vignetting_path = pathlib.Path(vignetting_path) if not vignetting_path.exists(): msg = f"File {vignetting_path} does not exist." raise InvalidDataError(msg) vignetting_function = load_ndcube_from_fits(vignetting_path, include_provenance=False) vignetting_function_date = vignetting_function.meta.astropy_time observation_date = data_object.meta.astropy_time if abs((vignetting_function_date - observation_date).to("day").value) > 14: msg = f"Calibration file {vignetting_path} contains data created greater than 2 weeks from the obsveration" warnings.warn(msg, LargeTimeDeltaWarning) if vignetting_function.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value: msg = f"Incorrect TELESCOP value within {vignetting_path}" warnings.warn(msg, IncorrectTelescopeWarning) if vignetting_function.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value: msg = f"Incorrect polarization state within {vignetting_path}" warnings.warn(msg, IncorrectPolarizationStateWarning) if vignetting_function.data.shape != data_object.data.shape: msg = f"Incorrect vignetting function shape within {vignetting_path}" raise InvalidDataError(msg) data_object.data[:, :] /= vignetting_function.data[:, :] data_object.uncertainty.array[:, :] /= vignetting_function.data[:, :] data_object.meta.history.add_now("LEVEL1-correct_vignetting", f"Vignetting corrected using {os.path.basename(str(vignetting_path))}") return data_object
[docs] def generate_vignetting_calibration(path_vignetting: str, path_mask: str, spacecraft: str, vignetting_threshold: float = 1.2, rows_ignore: tuple = (13,15), rows_adjust: tuple = (15,16), rows_adjust_source: tuple = (16,20)) -> np.ndarray: """ Create calibration data for vignetting. Parameters ---------- path_vignetting : str path to raw input vignetting function path_mask : str path to spacecraft mask function spacecraft : str spacecraft number vignetting_threshold : float, optional threshold for bad vignetting pixels, by default 1.2 rows_ignore : tuple, optional rows to exclude entirely from original vignetting data, by default (13,15) for 128x128 input rows_adjust : tuple, optional rows to adjust to the minimum of a set of rows above (per column), by default (15,16) for 128x128 input rows_adjust_source : tuple, optional rows to use for statistics to adjust vignetting rows as above, by default (16,20) for 128x128 input Returns ------- np.ndarray vignetting function array """ if spacecraft in ["1", "2", "3"]: with open(path_vignetting) as f: lines = f.readlines() with open(path_mask, "rb") as f: byte_array = f.read() mask = np.unpackbits(np.frombuffer(byte_array, dtype=np.uint8)).reshape(2048,2048) mask = mask.T num_bins, bin_size = lines[0].split() num_bins = int(num_bins) bin_size = int(bin_size) values = np.array([float(v) for line in lines[1:] for v in line.split()]) vignetting = values[:num_bins**2].reshape((num_bins, num_bins)) vignetting[vignetting > vignetting_threshold] = np.nan vignetting[rows_ignore[0]:rows_ignore[1],:] = np.nan vignetting[rows_adjust[0]:rows_adjust[1],:] = np.min(vignetting[rows_adjust_source[0]:rows_adjust_source[1],:], axis=0) wcs_vignetting = WCS(naxis=2) wcs_wfi = WCS(naxis=2) wcs_wfi.wcs.cdelt = wcs_wfi.wcs.cdelt * vignetting.shape[0] / 2048. vignetting_reprojected = reproject_adaptive((vignetting, wcs_vignetting), shape_out=(2048,2048), output_projection=wcs_wfi, boundary_mode="ignore", bad_value_mode="ignore", return_footprint=False) vignetting_reprojected = vignetting_reprojected * mask vignetting_reprojected[mask == 0] = 1 return vignetting_reprojected if spacecraft=="4": # TODO: implement NFI speckle inclusion return np.ones((2048,2048)) raise RuntimeError(f"Unknown spacecraft {spacecraft}")