import os.path
import warnings
from math import ceil
from datetime import datetime
import numpy as np
from ndcube import NDCube
from punchbowl.exceptions import DataValueWarning
from punchbowl.prefect import punch_task
TABLE_PATH = os.path.dirname(__file__) + "/decoding_tables/"
[docs]
def decode_sqrt(
data: np.ndarray | float,
from_bits: int = 16,
to_bits: int = 12,
ccd_gain_bottom: float = 4.9,
ccd_gain_top: float = 4.9,
ccd_offset: float = 100,
ccd_read_noise: float = 17,
overwrite_table: bool = False,
reconstitute_noise: bool = True,
dateobs: datetime | None = None,
) -> np.ndarray:
"""
Square root decode between specified bitrate values.
Parameters
----------
data
Input encoded data array
from_bits
Specified bitrate of encoded image to unpack
to_bits
Specified bitrate of output data (decoded)
ccd_gain_bottom
CCD gain (bottom side of CCD) [e / DN]
ccd_gain_top
CCD gain (top side of CCD) [e / DN]
ccd_offset
CCD bias level [DN]
ccd_read_noise
CCD read noise level [DN]
overwrite_table
Toggle to regenerate and overwrite existing decoding table
reconstitute_noise
If True, adds appropriate noise after decoding. Otherwise, only square root decodes.
dateobs
The reference time to use as the random noise seed. If None, then the result is random and not reproducible.
Returns
-------
np.ndarray
Square root decoded version of the input image
"""
table_name_bottom = (
TABLE_PATH
+ "tab_fb"
+ str(from_bits)
+ "_tb"
+ str(to_bits)
+ "_g"
+ str(ccd_gain_bottom)
+ "_b"
+ str(ccd_offset)
+ "_r"
+ str(ccd_read_noise)
+ ".npy"
)
table_name_top = (
TABLE_PATH
+ "tab_fb"
+ str(from_bits)
+ "_tb"
+ str(to_bits)
+ "_g"
+ str(ccd_gain_top)
+ "_b"
+ str(ccd_offset)
+ "_r"
+ str(ccd_read_noise)
+ ".npy"
)
if not os.path.isfile(table_name_bottom) or overwrite_table:
table_bottom = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_bottom, ccd_offset, ccd_read_noise)
if not os.path.isdir(TABLE_PATH):
os.makedirs(TABLE_PATH, exist_ok=True)
np.save(table_name_bottom, table_bottom)
else:
table_bottom = np.load(table_name_bottom)
if not os.path.isfile(table_name_top) or overwrite_table:
table_top = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain_top, ccd_offset, ccd_read_noise)
if not os.path.isdir(TABLE_PATH):
os.makedirs(TABLE_PATH, exist_ok=True)
np.save(table_name_top, table_top)
else:
table_top = np.load(table_name_top)
if isinstance(data, np.ndarray):
out_data = np.empty_like(data)
out_data[0:data.shape[1]//2,:] = decode_sqrt_by_table(data[0:data.shape[1]//2,:], table_bottom)
out_data[data.shape[1]//2:,:] = decode_sqrt_by_table(data[data.shape[1]//2:,:], table_top)
else:
out_data = decode_sqrt_by_table(data, table_bottom)
if reconstitute_noise:
scale = np.sqrt(out_data) / 2.2
if dateobs is None:
noise = np.random.normal(scale=scale)
else:
seed = int(dateobs.timestamp())
rng = np.random.default_rng(seed)
noise = rng.normal(scale=scale)
return out_data + noise
return out_data
[docs]
def encode_sqrt(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray:
"""
Square root encode between specified bitrate values.
Parameters
----------
data
Input data array
from_bits
Specified bitrate of original input image
to_bits
Specified bitrate of output encoded image
Returns
-------
np.ndarray
Encoded version of input data
"""
data = np.round(data).astype(np.int32).clip(0, None)
factor = np.array(2 ** (2 * to_bits - from_bits))
data_scaled_by_factor = np.round(data * factor).astype(np.int32)
return np.floor(np.sqrt(data_scaled_by_factor)).astype(np.int32)
[docs]
def decode_sqrt_simple(data: np.ndarray | float, from_bits: int = 16, to_bits: int = 12) -> np.ndarray:
"""
Perform a simple decoding using the naive squaring strategy.
Parameters
----------
data
Input data array
from_bits
Specified bitrate of original input image
to_bits
Specified bitrate of output encoded image
Returns
-------
np.ndarray
Decoded version of input data
"""
data = np.round(data).astype(np.int32).clip(0, None)
factor = 2.0 ** (2 * to_bits - from_bits)
return np.round(np.square(data) / factor).astype(np.int32)
[docs]
def noise_pdf(
data_value: np.ndarray | float,
ccd_gain: float = 4.9,
ccd_offset: float = 100,
ccd_read_noise: float = 17,
n_sigma: int = 5,
n_steps: int = 10000,
) -> tuple:
"""
Generate a probability distribution function (pdf) from an input data value.
Parameters
----------
data_value
Input data value
ccd_gain
CCD gain [e/DN]
ccd_offset
CCD bias level [DN]
ccd_read_noise
CCD read noise level [DN]
n_sigma
Number of sigma steps
n_steps
Number of data steps
Returns
-------
np.ndarray
Data step distribution
normal
Data normal distribution
"""
# Use camera calibration to get an e-count
electrons = np.clip((data_value - ccd_offset) * ccd_gain, 1, None)
# Shot noise, converted back to DN
poisson_sigma = np.sqrt(electrons) / ccd_gain
# Total sigma is quadrature sum of fixed & shot
sigma = np.sqrt(poisson_sigma**2 + ccd_read_noise**2)
dn_steps = np.arange(-n_sigma * sigma, n_sigma * sigma, sigma * n_sigma * 2 / n_steps)
# Explicitly calculate the Gaussian/normal PDF at each step
normal = np.exp(-dn_steps * dn_steps / sigma / sigma / 2)
# Easier to normalize numerically than to account for missing tails
normal = normal / np.sum(normal)
return data_value + dn_steps, normal
[docs]
def mean_b_offset(
data_value: float,
from_bits: int = 16,
to_bits: int = 12,
ccd_gain: float = 4.9,
ccd_offset: float = 100,
ccd_read_noise: float = 17,
) -> float:
"""
Compute an offset from the naive and robust decoding processes.
Parameters
----------
data_value
Input data value [DN]
from_bits
Specified bitrate of encoded image to unpack
to_bits
Specified bitrate of output data (decoded)
ccd_gain
CCD gain [e/DN]
ccd_offset
CCD bias level [DN]
ccd_read_noise
CCD read noise level [DN]
Returns
-------
float
Generated decoding value for use in constructing a decoding table
"""
naive_decoded_value = decode_sqrt_simple(data_value, from_bits, to_bits)
# Generate distribution around naive value
(values, weights) = noise_pdf(naive_decoded_value, ccd_gain, ccd_offset, ccd_read_noise)
# Ignore values below the offset -- which break the noise model
weights = weights * (values >= ccd_offset)
if np.sum(weights) < 0.95:
return 0
weights = weights / np.sum(weights)
# Encode the entire value distribution
data_values = encode_sqrt(values, from_bits, to_bits)
# Decode the entire value distribution to find the net offset
net_offset = decode_sqrt_simple(data_values, from_bits, to_bits)
# Expected value of the entire distribution
expected_value = np.sum(net_offset * weights)
# Return ΔB.
return expected_value - naive_decoded_value
[docs]
def decode_sqrt_corrected(
data_value: float,
from_bits: int = 16,
to_bits: int = 12,
ccd_gain: float = 4.9,
ccd_offset: float = 100,
ccd_read_noise: float = 17,
) -> float:
"""
Compute an individual decoding value for an input data value.
Parameters
----------
data_value
Input data value [DN]
from_bits
Specified bitrate of encoded image to unpack
to_bits
Specified bitrate of output data (decoded)
ccd_gain
CCD gain [e/DN]
ccd_offset
CCD bias level [DN]
ccd_read_noise
CCD read noise level [DN]
Returns
-------
float
Generated decoding value for use in constructing a decoding table
"""
s1p = decode_sqrt_simple(data_value + 1, from_bits, to_bits)
s1n = decode_sqrt_simple(data_value - 1, from_bits, to_bits)
width = (s1p - s1n) / 4
fixed_sigma = np.sqrt(ccd_read_noise**2 + width**2)
of = mean_b_offset(data_value, from_bits, to_bits, ccd_gain, ccd_offset, fixed_sigma)
return decode_sqrt_simple(data_value, from_bits, to_bits) - of
[docs]
def generate_decode_sqrt_table(
from_bits: int = 16,
to_bits: int = 12,
ccd_gain: float = 4.9,
ccd_offset: float = 100,
ccd_read_noise: float = 17,
first_order: bool = False,
) -> np.ndarray:
"""
Generate a square root decode table between specified bitrate values and CCD parameters.
Parameters
----------
from_bits
Specified bitrate of encoded image to unpack
to_bits
Specified bitrate of output data (decoded)
ccd_gain
CCD gain [e/DN]
ccd_offset
CCD bias level [DN]
ccd_read_noise
CCD read noise level [DN]
first_order
Toggle for first order square root decoding table only, defaults to False
Returns
-------
table
Generated square root decoding table
"""
table = np.zeros(ceil(2**to_bits))
for i in range(table.size):
table[i] = decode_sqrt_corrected(i, from_bits, to_bits, ccd_gain, ccd_offset, ccd_read_noise)
if first_order:
return table
table0 = decode_sqrt_simple(np.arange(0,2**to_bits,1),from_bits,to_bits)
table2 = np.copy(table)
table2[1:-2] = table[1:-2] + 0.75 * ((table0-table)[0:-3]+(table0-table)[2:-1]-2*(table0-table)[1:-2])
return table2
[docs]
def decode_sqrt_by_table(data: np.ndarray | float, table: np.ndarray) -> np.ndarray:
"""
Generate a square root decode table between specified bitrate values and CCD parameters.
Parameters
----------
data
Input encoded data array
table
Square root decoding table
Returns
-------
np.ndarray
Decoded version of input data
"""
rounded_data = np.round(data).astype(np.int32)
if np.any(rounded_data > table.shape[0]):
warnings.warn("Data values exceeded square-root encoding lookup table.", DataValueWarning)
data = rounded_data.clip(0, table.shape[0]-1)
return table[data]
[docs]
@punch_task
def decode_sqrt_data(data_object: NDCube, overwrite_table: bool = False) -> NDCube:
"""
Prefect task in the pipeline to decode square root encoded data.
Parameters
----------
data_object : NDCube
the object you wish to decode
overwrite_table
Toggle to regenerate and overwrite existing decoding table
Returns
-------
NDCube
a modified version of the input with the data square root decoded
"""
data = data_object.data
from_bits = data_object.meta["RAWBITS"].value
to_bits = data_object.meta["COMPBITS"].value
ccd_gain_bottom = data_object.meta["GAINBTM"].value
ccd_gain_top = data_object.meta["GAINTOP"].value
ccd_offset = data_object.meta["OFFSET"].value or 400 # in the case it's not provided, we use 400
ccd_read_noise = 17 # DN # TODO: make this not a hardcoded value!
decoded_data = decode_sqrt(
data,
from_bits=from_bits,
to_bits=to_bits,
ccd_gain_bottom=ccd_gain_bottom,
ccd_gain_top=ccd_gain_top,
ccd_offset=ccd_offset,
ccd_read_noise=ccd_read_noise,
overwrite_table=overwrite_table,
reconstitute_noise=True,
dateobs = data_object.meta.datetime,
)
decoded_saturation_value = decode_sqrt(
data_object.meta["DSATVAL"].value,
from_bits=from_bits,
to_bits=to_bits,
ccd_gain_bottom=ccd_gain_bottom,
ccd_gain_top=ccd_gain_top,
ccd_offset=ccd_offset,
ccd_read_noise=ccd_read_noise,
overwrite_table=overwrite_table,
reconstitute_noise=False,
)
data_object.meta["DSATVAL"] = decoded_saturation_value
data_object.data[...] = decoded_data[...] # TODO: we need to make sure the data type changes?
data_object.meta.history.add_now("LEVEL0-decode-sqrt", "image square root decoded")
return data_object