Source code for punchbowl.level1.sqrt

import os.path
import warnings
from math import ceil
from datetime import datetime

import numpy as np
from ndcube import NDCube

from punchbowl.exceptions import DataValueWarning
from punchbowl.prefect import punch_task

TABLE_PATH = os.path.dirname(__file__) + "/decoding_tables/"


[docs] def decode_sqrt( data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12, ccd_gain_bottom: float = 4.9, ccd_gain_top: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, overwrite_table: bool = False, reconstitute_noise: bool = True, dateobs: datetime | None = None, ) -> np.ndarray: """ Square root decode between specified bitrate values. Parameters ---------- data Input encoded data array from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain_bottom CCD gain (bottom side of CCD) [e / DN] ccd_gain_top CCD gain (top side of CCD) [e / DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] overwrite_table Toggle to regenerate and overwrite existing decoding table reconstitute_noise If True, adds appropriate noise after decoding. Otherwise, only square root decodes. dateobs The reference time to use as the random noise seed. If None, then the result is random and not reproducible. Returns ------- np.ndarray Square root decoded version of the input image """ table_name_bottom = ( TABLE_PATH + "tab_fb" + str(from_bits) + "_tb" + str(to_bits) + "_g" + str(ccd_gain_bottom) + "_b" + str(ccd_offset) + "_r" + str(ccd_read_noise) + ".npy" ) table_name_top = ( TABLE_PATH + "tab_fb" + str(from_bits) + "_tb" + str(to_bits) + "_g" + str(ccd_gain_top) + "_b" + str(ccd_offset) + "_r" + str(ccd_read_noise) + ".npy" ) if not os.path.isfile(table_name_bottom) or overwrite_table: table_bottom = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_bottom, ccd_offset, ccd_read_noise) if not os.path.isdir(TABLE_PATH): os.makedirs(TABLE_PATH, exist_ok=True) np.save(table_name_bottom, table_bottom) else: table_bottom = np.load(table_name_bottom) if not os.path.isfile(table_name_top) or overwrite_table: table_top = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_top, ccd_offset, ccd_read_noise) if not os.path.isdir(TABLE_PATH): os.makedirs(TABLE_PATH, exist_ok=True) np.save(table_name_top, table_top) else: table_top = np.load(table_name_top) if isinstance(data, np.ndarray): out_data = np.empty_like(data) out_data[0:data.shape[1]//2,:] = decode_sqrt_by_table(data[0:data.shape[1]//2,:], table_bottom) out_data[data.shape[1]//2:,:] = decode_sqrt_by_table(data[data.shape[1]//2:,:], table_top) else: out_data = decode_sqrt_by_table(data, table_bottom) if reconstitute_noise: scale = np.sqrt(out_data) / 2.2 if dateobs is None: noise = np.random.normal(scale=scale) else: seed = int(dateobs.timestamp()) rng = np.random.default_rng(seed) noise = rng.normal(scale=scale) return out_data + noise return out_data
[docs] def encode_sqrt(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray: """ Square root encode between specified bitrate values. Parameters ---------- data Input data array from_bits Specified bitrate of original input image to_bits Specified bitrate of output encoded image Returns ------- np.ndarray Encoded version of input data """ data = np.round(data).astype(np.int32).clip(0, None) factor = np.array(2 ** (2 * to_bits - from_bits)) data_scaled_by_factor = np.round(data * factor).astype(np.int32) return np.floor(np.sqrt(data_scaled_by_factor)).astype(np.int32)
[docs] def decode_sqrt_simple(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray: """ Perform a simple decoding using the naive squaring strategy. Parameters ---------- data Input data array from_bits Specified bitrate of original input image to_bits Specified bitrate of output encoded image Returns ------- np.ndarray Decoded version of input data """ data = np.round(data).astype(np.int32).clip(0, None) factor = 2.0 ** (2 * to_bits - from_bits) return np.round(np.square(data) / factor).astype(np.int32)
[docs] def noise_pdf( data_value: np.ndarray | float, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, n_sigma: int = 5, n_steps: int = 10000, ) -> tuple: """ Generate a probability distribution function (pdf) from an input data value. Parameters ---------- data_value Input data value ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] n_sigma Number of sigma steps n_steps Number of data steps Returns ------- np.ndarray Data step distribution normal Data normal distribution """ # Use camera calibration to get an e-count electrons = np.clip((data_value - ccd_offset) * ccd_gain, 1, None) # Shot noise, converted back to DN poisson_sigma = np.sqrt(electrons) / ccd_gain # Total sigma is quadrature sum of fixed & shot sigma = np.sqrt(poisson_sigma**2 + ccd_read_noise**2) dn_steps = np.arange(-n_sigma * sigma, n_sigma * sigma, sigma * n_sigma * 2 / n_steps) # Explicitly calculate the Gaussian/normal PDF at each step normal = np.exp(-dn_steps * dn_steps / sigma / sigma / 2) # Easier to normalize numerically than to account for missing tails normal = normal / np.sum(normal) return data_value + dn_steps, normal
[docs] def mean_b_offset( data_value: float, from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, ) -> float: """ Compute an offset from the naive and robust decoding processes. Parameters ---------- data_value Input data value [DN] from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] Returns ------- float Generated decoding value for use in constructing a decoding table """ naive_decoded_value = decode_sqrt_simple(data_value, from_bits, to_bits) # Generate distribution around naive value (values, weights) = noise_pdf(naive_decoded_value, ccd_gain, ccd_offset, ccd_read_noise) # Ignore values below the offset -- which break the noise model weights = weights * (values >= ccd_offset) if np.sum(weights) < 0.95: return 0 weights = weights / np.sum(weights) # Encode the entire value distribution data_values = encode_sqrt(values, from_bits, to_bits) # Decode the entire value distribution to find the net offset net_offset = decode_sqrt_simple(data_values, from_bits, to_bits) # Expected value of the entire distribution expected_value = np.sum(net_offset * weights) # Return ΔB. return expected_value - naive_decoded_value
[docs] def decode_sqrt_corrected( data_value: float, from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, ) -> float: """ Compute an individual decoding value for an input data value. Parameters ---------- data_value Input data value [DN] from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] Returns ------- float Generated decoding value for use in constructing a decoding table """ s1p = decode_sqrt_simple(data_value + 1, from_bits, to_bits) s1n = decode_sqrt_simple(data_value - 1, from_bits, to_bits) width = (s1p - s1n) / 4 fixed_sigma = np.sqrt(ccd_read_noise**2 + width**2) of = mean_b_offset(data_value, from_bits, to_bits, ccd_gain, ccd_offset, fixed_sigma) return decode_sqrt_simple(data_value, from_bits, to_bits) - of
[docs] def generate_decode_sqrt_table( from_bits: int = 16, to_bits: int = 12, ccd_gain: float = 4.9, ccd_offset: float = 100, ccd_read_noise: float = 17, first_order: bool = False, ) -> np.ndarray: """ Generate a square root decode table between specified bitrate values and CCD parameters. Parameters ---------- from_bits Specified bitrate of encoded image to unpack to_bits Specified bitrate of output data (decoded) ccd_gain CCD gain [e/DN] ccd_offset CCD bias level [DN] ccd_read_noise CCD read noise level [DN] first_order Toggle for first order square root decoding table only, defaults to False Returns ------- table Generated square root decoding table """ table = np.zeros(ceil(2**to_bits)) for i in range(table.size): table[i] = decode_sqrt_corrected(i, from_bits, to_bits, ccd_gain, ccd_offset, ccd_read_noise) if first_order: return table table0 = decode_sqrt_simple(np.arange(0,2**to_bits,1),from_bits,to_bits) table2 = np.copy(table) table2[1:-2] = table[1:-2] + 0.75 * ((table0-table)[0:-3]+(table0-table)[2:-1]-2*(table0-table)[1:-2]) return table2
[docs] def decode_sqrt_by_table(data: np.ndarray | float, table: np.ndarray) -> np.ndarray: """ Generate a square root decode table between specified bitrate values and CCD parameters. Parameters ---------- data Input encoded data array table Square root decoding table Returns ------- np.ndarray Decoded version of input data """ rounded_data = np.round(data).astype(np.int32) if np.any(rounded_data > table.shape[0]): warnings.warn("Data values exceeded square-root encoding lookup table.", DataValueWarning) data = rounded_data.clip(0, table.shape[0]-1) return table[data]
[docs] @punch_task def decode_sqrt_data(data_object: NDCube, overwrite_table: bool = False) -> NDCube: """ Prefect task in the pipeline to decode square root encoded data. Parameters ---------- data_object : NDCube the object you wish to decode overwrite_table Toggle to regenerate and overwrite existing decoding table Returns ------- NDCube a modified version of the input with the data square root decoded """ data = data_object.data from_bits = data_object.meta["RAWBITS"].value to_bits = data_object.meta["COMPBITS"].value ccd_gain_bottom = data_object.meta["GAINBTM"].value ccd_gain_top = data_object.meta["GAINTOP"].value ccd_offset = data_object.meta["OFFSET"].value or 400 # in the case it's not provided, we use 400 ccd_read_noise = 17 # DN # TODO: make this not a hardcoded value! decoded_data = decode_sqrt( data, from_bits=from_bits, to_bits=to_bits, ccd_gain_bottom=ccd_gain_bottom, ccd_gain_top=ccd_gain_top, ccd_offset=ccd_offset, ccd_read_noise=ccd_read_noise, overwrite_table=overwrite_table, reconstitute_noise=True, dateobs = data_object.meta.datetime, ) decoded_saturation_value = decode_sqrt( data_object.meta["DSATVAL"].value, from_bits=from_bits, to_bits=to_bits, ccd_gain_bottom=ccd_gain_bottom, ccd_gain_top=ccd_gain_top, ccd_offset=ccd_offset, ccd_read_noise=ccd_read_noise, overwrite_table=overwrite_table, reconstitute_noise=False, ) data_object.meta["DSATVAL"] = decoded_saturation_value data_object.data[...] = decoded_data[...] # TODO: we need to make sure the data type changes? data_object.meta.history.add_now("LEVEL0-decode-sqrt", "image square root decoded") return data_object