Source code for punchbowl.level1.sqrt

import os.path
import warnings
from math import ceil

import numpy as np
from ndcube import NDCube

from punchbowl.exceptions import DataValueWarning
from punchbowl.prefect import punch_task

TABLE_PATH = os.path.dirname(__file__) + "/decoding_tables/"


[docs] def decode_sqrt( data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12, ccd_gain_bottom: float = 4.9, ccd_gain_top: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, overwrite_table: bool = False, ) -> np.ndarray: """ Square root decode between specified bitrate values. Parameters ---------- data Input encoded data array from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain_bottom CCD gain (bottom side of CCD) [e / DN] ccd_gain_top CCD gain (top side of CCD) [e / DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] overwrite_table Toggle to regenerate and overwrite existing decoding table Returns ------- np.ndarray Square root decoded version of the input image """ table_name_bottom = ( TABLE_PATH + "tab_fb" + str(from_bits) + "_tb" + str(to_bits) + "_g" + str(ccd_gain_bottom) + "_b" + str(ccd_offset) + "_r" + str(ccd_read_noise) + ".npy" ) table_name_top = ( TABLE_PATH + "tab_fb" + str(from_bits) + "_tb" + str(to_bits) + "_g" + str(ccd_gain_top) + "_b" + str(ccd_offset) + "_r" + str(ccd_read_noise) + ".npy" ) if not os.path.isfile(table_name_bottom) or overwrite_table: table_bottom = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_bottom, ccd_offset, ccd_read_noise) if not os.path.isdir(TABLE_PATH): os.makedirs(TABLE_PATH, exist_ok=True) np.save(table_name_bottom, table_bottom) else: table_bottom = np.load(table_name_bottom) if not os.path.isfile(table_name_top) or overwrite_table: table_top = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_top, ccd_offset, ccd_read_noise) if not os.path.isdir(TABLE_PATH): os.makedirs(TABLE_PATH, exist_ok=True) np.save(table_name_top, table_top) else: table_top = np.load(table_name_top) if isinstance(data, np.ndarray): out_data = np.empty_like(data) out_data[0:data.shape[1]//2,:] = decode_sqrt_by_table(data[0:data.shape[1]//2,:], table_bottom) out_data[data.shape[1]//2:,:] = decode_sqrt_by_table(data[data.shape[1]//2:,:], table_top) else: out_data = decode_sqrt_by_table(data, table_bottom) return out_data
[docs] def encode_sqrt(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray: """ Square root encode between specified bitrate values. Parameters ---------- data Input data array from_bits Specified bitrate of original input image to_bits Specified bitrate of output encoded image Returns ------- np.ndarray Encoded version of input data """ data = np.round(data).astype(np.int32).clip(0, None) factor = np.array(2 ** (2 * to_bits - from_bits)) data_scaled_by_factor = np.round(data * factor).astype(np.int32) return np.floor(np.sqrt(data_scaled_by_factor)).astype(np.int32)
[docs] def decode_sqrt_simple(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray: """ Perform a simple decoding using the naive squaring strategy. Parameters ---------- data Input data array from_bits Specified bitrate of original input image to_bits Specified bitrate of output encoded image Returns ------- np.ndarray Decoded version of input data """ data = np.round(data).astype(np.int32).clip(0, None) factor = 2.0 ** (2 * to_bits - from_bits) return np.round(np.square(data) / factor).astype(np.int32)
[docs] def noise_pdf( data_value: np.ndarray | float, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, n_sigma: int = 5, n_steps: int = 10000, ) -> tuple: """ Generate a probability distribution function (pdf) from an input data value. Parameters ---------- data_value Input data value ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] n_sigma Number of sigma steps n_steps Number of data steps Returns ------- np.ndarray Data step distribution normal Data normal distribution """ # Use camera calibration to get an e-count electrons = np.clip((data_value - ccd_offset) * ccd_gain, 1, None) # Shot noise, converted back to DN poisson_sigma = np.sqrt(electrons) / ccd_gain # Total sigma is quadrature sum of fixed & shot sigma = np.sqrt(poisson_sigma**2 + ccd_read_noise**2) dn_steps = np.arange(-n_sigma * sigma, n_sigma * sigma, sigma * n_sigma * 2 / n_steps) # Explicitly calculate the Gaussian/normal PDF at each step normal = np.exp(-dn_steps * dn_steps / sigma / sigma / 2) # Easier to normalize numerically than to account for missing tails normal = normal / np.sum(normal) return data_value + dn_steps, normal
[docs] def mean_b_offset( data_value: float, from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, ) -> float: """ Compute an offset from the naive and robust decoding processes. Parameters ---------- data_value Input data value [DN] from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] Returns ------- float Generated decoding value for use in constructing a decoding table """ naive_decoded_value = decode_sqrt_simple(data_value, from_bits, to_bits) # Generate distribution around naive value (values, weights) = noise_pdf(naive_decoded_value, ccd_gain, ccd_offset, ccd_read_noise) # Ignore values below the offset -- which break the noise model weights = weights * (values >= ccd_offset) if np.sum(weights) < 0.95: return 0 weights = weights / np.sum(weights) # Encode the entire value distribution data_values = encode_sqrt(values, from_bits, to_bits) # Decode the entire value distribution to find the net offset net_offset = decode_sqrt_simple(data_values, from_bits, to_bits) # Expected value of the entire distribution expected_value = np.sum(net_offset * weights) # Return ΔB. return expected_value - naive_decoded_value
[docs] def decode_sqrt_corrected( data_value: float, from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, ) -> float: """ Compute an individual decoding value for an input data value. Parameters ---------- data_value Input data value [DN] from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] Returns ------- float Generated decoding value for use in constructing a decoding table """ s1p = decode_sqrt_simple(data_value + 1, from_bits, to_bits) s1n = decode_sqrt_simple(data_value - 1, from_bits, to_bits) width = (s1p - s1n) / 4 fixed_sigma = np.sqrt(ccd_read_noise**2 + width**2) of = mean_b_offset(data_value, from_bits, to_bits, ccd_gain, ccd_offset, fixed_sigma) return decode_sqrt_simple(data_value, from_bits, to_bits) - of
[docs] def generate_decode_sqrt_table( from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, first_order: bool = False, ) -> np.ndarray: """ Generate a square root decode table between specified bitrate values and CCD parameters. Parameters ---------- from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] first_order Toggle for first order square root decoding table only, defaults to False Returns ------- table Generated square root decoding table """ table = np.zeros(ceil(2**to_bits)) for i in range(table.size): table[i] = decode_sqrt_corrected(i, from_bits, to_bits, ccd_gain, ccd_offset, ccd_read_noise) if first_order: return table table0 = decode_sqrt_simple(np.arange(0,2**to_bits,1),from_bits,to_bits) table2 = np.copy(table) table2[1:-2] = table[1:-2] + 0.75 * ((table0-table)[0:-3]+(table0-table)[2:-1]-2*(table0-table)[1:-2]) return table2
[docs] def decode_sqrt_by_table(data: np.ndarray | float, table: np.ndarray) -> np.ndarray: """ Generate a square root decode table between specified bitrate values and CCD parameters. Parameters ---------- data Input encoded data array table Square root decoding table Returns ------- np.ndarray Decoded version of input data """ rounded_data = np.round(data).astype(np.int32) if np.any(rounded_data > table.shape[0]): warnings.warn("Data values exceeded square-root encoding lookup table.", DataValueWarning) data = rounded_data.clip(0, table.shape[0]-1) return table[data]
[docs] @punch_task def decode_sqrt_data(data_object: NDCube, overwrite_table: bool = False) -> NDCube: """ Prefect task in the pipeline to decode square root encoded data. Parameters ---------- data_object : NDCube the object you wish to decode overwrite_table Toggle to regenerate and overwrite existing decoding table Returns ------- NDCube a modified version of the input with the data square root decoded """ data = data_object.data from_bits = data_object.meta["RAWBITS"].value to_bits = data_object.meta["COMPBITS"].value ccd_gain_bottom = data_object.meta["GAINBTM"].value ccd_gain_top = data_object.meta["GAINTOP"].value ccd_offset = data_object.meta["OFFSET"].value or 400 # in the case it's not provided, we use 400 ccd_read_noise = 17 # DN # TODO: make this not a hardcoded value! decoded_data = decode_sqrt( data, from_bits=from_bits, to_bits=to_bits, ccd_gain_bottom=ccd_gain_bottom, ccd_gain_top=ccd_gain_top, ccd_offset=ccd_offset, ccd_read_noise=ccd_read_noise, overwrite_table=overwrite_table, ) decoded_saturation_value = decode_sqrt( data_object.meta["DSATVAL"].value, from_bits=from_bits, to_bits=to_bits, ccd_gain_bottom=ccd_gain_bottom, ccd_gain_top=ccd_gain_top, ccd_offset=ccd_offset, ccd_read_noise=ccd_read_noise, overwrite_table=overwrite_table, ) data_object.meta["DSATVAL"] = decoded_saturation_value data_object.data[...] = decoded_data[...] # TODO: we need to make sure the data type changes? data_object.meta.history.add_now("LEVEL0-decode-sqrt", "image square root decoded") return data_object