from __future__ import annotations
import os
import string
import hashlib
import os.path
import warnings
import traceback
import subprocess
import multiprocessing as mp
from copy import deepcopy
from typing import Any
from pathlib import Path
from itertools import repeat
from concurrent.futures import ProcessPoolExecutor
import astropy.units as u
import lxml.etree as et
import numpy as np
from astropy.io import fits
from astropy.io.fits import Header
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS, FITSFixedWarning
from glymur import Jp2k, jp2box
from matplotlib.colors import PowerNorm
from ndcube import NDCube
from PIL import Image, ImageDraw, ImageFont
from punchbowl.data.meta import NormalizedMetadata
from punchbowl.data.visualize import cmap_punch
_ROOT = os.path.abspath(os.path.dirname(__file__))
CALIBRATION_ANNOTATION = "{OBSRVTRY} - {TYPECODE}{OBSCODE} - {DATE-OBS} - exptime: {EXPTIME} s - polarizer: {POLAR} deg"
[docs]
def write_file_hash(path: str) -> None:
"""Create a SHA-256 hash for a file."""
file_hash = hashlib.sha256()
with open(path, "rb") as f:
fb = f.read()
file_hash.update(fb)
with open(path + ".sha", "w") as f:
f.write(file_hash.hexdigest())
[docs]
def get_base_file_name(cube: NDCube) -> str:
"""Determine the base file name without file type extension."""
obscode = cube.meta["OBSCODE"].value
file_level = cube.meta["LEVEL"].value
type_code = cube.meta["TYPECODE"].value
date_string = cube.meta.datetime.strftime("%Y%m%d%H%M%S")
file_version = cube.meta["FILEVRSN"].value
file_version = "?" if file_version == "" else file_version # file version should never be empty!
return "PUNCH_L" + file_level + "_" + type_code + obscode + "_" + date_string + "_v" + file_version
[docs]
def _generate_jp2_xmlbox(header: Header) -> jp2box.XMLBox:
"""
Generate the JPEG2000 XML box to be inserted into the JPEG2000 file.
(Helper function adapted from SunPy)
"""
header_xml = _header_to_xml(header)
meta = et.Element("meta")
meta.append(header_xml)
tree = et.ElementTree(meta)
return jp2box.XMLBox(xml=tree)
[docs]
def write_ndcube_to_quicklook(cube: NDCube,
filename: str,
layer: int | None = None,
vmin: float = 1e-15,
vmax: float = 8e-12,
include_meta: bool = True,
annotation: str | None = None,
color: bool = False,
gamma: float = 1/2.2) -> None:
"""
Write an NDCube to a Quicklook format as a jpeg.
Parameters
----------
cube : NDCube
data cube to visualize
filename : str
path to save output, must end in .jp2, .j2k, .jpeg, .jpg
layer : int | None
if the cube is 3D, then selects cube.data[layer] for visualization
vmin : float
the lower limit value to visualize
vmax : float
the upper limit value to visualize
include_meta : bool
whether to include metadata in the JPEG2000 file
annotation : str | None
a formatted string to add to the bottom of the image as a label
can access metadata by key, e.g. "typecode={TYPECODE}" would write the data's typecode into the image
color : bool
flag to generate RGB quicklook files, grayscale by default
gamma : float
power law exponent used for color normalization
Returns
-------
None
"""
if (len(cube.data.shape) != 2) and layer is None:
msg = "Output data must be two-dimensional, or a layer must be specified"
raise ValueError(msg)
if not filename.endswith((".jp2", ".j2k", ".jpeg", ".jpg")):
msg = ("Filename must have a valid file extension `.jpeg`, `jpg`, `.jp2` or `.j2k`"
f"Found: {os.path.splitext(filename)[1]}")
raise ValueError(msg)
norm = PowerNorm(gamma = gamma, vmin=vmin, vmax=vmax)
if layer is not None: # noqa: SIM108
image = cube.data[layer, :, :]
else:
image = cube.data
if color:
mode = "RGBA"
scaled_arr = (cmap_punch(norm(np.flipud(image))) * 255).astype(np.uint8)
fill_value = (255, 255, 255)
else:
mode = "L"
scaled_arr = (np.clip(norm(np.flipud(image)), 0, 1) * 255).astype(np.uint8)
fill_value = 255
pil_image = Image.fromarray(scaled_arr, mode=mode)
if annotation:
pad_height = int(image.shape[1] * 50 / 2048)
padded_image = Image.new(mode, (pil_image.width, pil_image.height + pad_height))
padded_image.paste(pil_image, (0, 0))
draw = ImageDraw.Draw(padded_image)
font = ImageFont.load_default(size=int(pad_height / 2))
formatter = DefaultFormatter()
text = formatter.format(annotation, **cube.meta)
text_offset = int(10 * image.shape[1] / 2048)
text_position = (text_offset, pil_image.height + text_offset)
draw.text(text_position, text, font=font, fill=fill_value)
pil_image = padded_image
arr_image = np.array(pil_image)
tmp_filename = f"{filename}tmp.jp2"
jp2 = Jp2k(tmp_filename, arr_image)
meta_boxes = jp2.box
target_index = len(meta_boxes) - 1
if include_meta:
header = cube.meta.to_fits_header(wcs=cube.wcs)
header.remove("COMMENT", ignore_missing=True, remove_all=True)
fits_box = _generate_jp2_xmlbox(header)
meta_boxes.insert(target_index, fits_box)
jp2.wrap(filename, boxes=meta_boxes)
os.remove(tmp_filename)
[docs]
def write_quicklook_to_mp4(files: list[str],
filename: str,
framerate: int = 5,
resolution: int = 1024,
codec: str = "libx264",
ffmpeg_cmd: str = "ffmpeg",
) -> None:
"""
Write a list of input quicklook jpeg2000 files to an output mp4 animation.
Parameters
----------
files : list[str]
List of input files to animate
filename : str
Output filename
framerate : int, optional
Frame rate (default 5)
resolution : int, optional
Output resolution (default 1024)
codec : str, optional
Codec to use for encoding. For GPU acceleration.
"h264_videotoolbox" can be used on ARM Macs, "h264_nvenc" can be used on Intel machines.
ffmpeg_cmd : str
path to the ffmpeg executable
"""
input_sequence = f"concat:{'|'.join(files)}"
ffmpeg_command = [
ffmpeg_cmd,
"-framerate", str(framerate),
"-i", input_sequence,
"-vf", f"scale=-1:{resolution}",
"-c:v", codec,
"-pix_fmt", "yuv420p",
"-y",
filename,
]
subprocess.run(ffmpeg_command, check=False) # noqa: S603
[docs]
def write_ndcube_to_fits(cube: NDCube,
filename: str,
overwrite: bool = False,
write_hash: bool = True,
skip_stats: bool = False,
skip_wcs_conversion: bool = False,
uncertainty_quantize_level: float = 16) -> None:
"""Write an NDCube as a FITS file."""
if not filename.endswith(".fits"):
msg = (
"Filename must have a valid file extension `.fits`"
f"Found: {os.path.splitext(filename)[1]}"
)
raise ValueError(msg)
meta = cube.meta if skip_stats else _update_statistics(cube)
full_header = meta.to_fits_header(wcs=cube.wcs, write_celestial_wcs=not skip_wcs_conversion)
full_header["FILENAME"] = os.path.basename(filename)
hdu_data = fits.CompImageHDU(data=cube.data.astype(np.float32) if cube.data.dtype == np.float64 else cube.data,
header=full_header,
name="Primary data array")
hdu_provenance = fits.BinTableHDU.from_columns(fits.ColDefs([fits.Column(
name="provenance", format="A40", array=np.char.array(meta.provenance))]))
hdu_provenance.name = "File provenance"
hdul = cube.wcs.to_fits()
hdul[0] = fits.PrimaryHDU()
hdul.insert(1, hdu_data)
if meta["LEVEL"].value != "0" and cube.uncertainty is not None:
hdu_uncertainty = fits.CompImageHDU(data=_pack_uncertainty(cube),
header=full_header,
name="Uncertainty array",
quantize_level=uncertainty_quantize_level,
quantize_method=2)
hdul.insert(2, hdu_uncertainty)
hdul.append(hdu_provenance)
hdul.writeto(filename, overwrite=overwrite, checksum=True)
hdul.close()
if write_hash:
write_file_hash(filename)
[docs]
def _pack_uncertainty(cube: NDCube) -> np.ndarray:
"""Compress the uncertainty for writing to file."""
output = np.zeros_like(cube.data) - 999 if cube.uncertainty is None else 1 / (cube.uncertainty.array / cube.data)
if cube.mask is not None:
output[cube.mask] = 0
return output
[docs]
def _unpack_uncertainty(uncertainty_array: np.ndarray, data_array: np.ndarray) -> np.ndarray:
"""Uncompress the uncertainty when reading from a file."""
# This is (1/uncertainty_array) * data_array, but this way we save time on memory allocation
with np.errstate(divide="ignore", invalid="ignore"):
np.divide(1, uncertainty_array, out=uncertainty_array)
np.multiply(data_array, uncertainty_array, out=uncertainty_array)
uncertainty_array[np.isnan(uncertainty_array) * (data_array == 0)] = np.inf
return uncertainty_array
[docs]
def _update_statistics(cube: NDCube, modify_inplace: bool = False) -> NormalizedMetadata:
"""Update image statistics in metadata before writing to file."""
meta = cube.meta
if not modify_inplace:
meta = deepcopy(meta)
meta["DATAZER"] = len(np.where(cube.data == 0)[0])
meta["DATASAT"] = len(np.where(cube.data >= meta["DSATVAL"].value)[0])
nonzero_data = cube.data[np.isfinite(cube.data) * (cube.data != 0)].flatten()
if len(nonzero_data) > 0:
meta["DATAAVG"] = np.mean(nonzero_data).item()
meta["DATAMDN"] = np.median(nonzero_data).item()
meta["DATASIG"] = np.std(nonzero_data).item()
else:
meta["DATAAVG"] = -999.0
meta["DATAMDN"] = -999.0
meta["DATASIG"] = -999.0
percentile_percentages = [1, 10, 25, 50, 75, 90, 95, 98, 99]
if len(nonzero_data) > 0:
percentile_values = np.percentile(nonzero_data, percentile_percentages)
if np.any(np.isnan(percentile_values)): # report nan if any of the values are nan
percentile_values = [-999.0 for _ in percentile_percentages]
for percent, value in zip(percentile_percentages, percentile_values, strict=True):
meta[f"DATAP{percent:02d}"] = value
meta["DATAMIN"] = np.min(nonzero_data).item()
meta["DATAMAX"] = np.max(nonzero_data).item()
else:
for percent in percentile_percentages:
meta[f"DATAP{percent:02d}"] = -999.0
meta["DATAMIN"] = 0.0
meta["DATAMAX"] = 0.0
return meta
[docs]
def load_ndcube_from_fits(path: str | Path, key: str = " ", include_provenance: bool = True,
include_uncertainty: bool = True, dtype: type = float) -> NDCube:
"""Load an NDCube from a FITS file."""
with warnings.catch_warnings(), fits.open(path) as hdul:
warnings.filterwarnings(action="ignore", message=".*CROTA.*Human-readable solar north pole angle.*",
category=FITSFixedWarning)
primary_hdu = hdul[1]
data = primary_hdu.data
header = primary_hdu.header
# Reset checksum and datasum to match astropy.io.fits behavior
header["CHECKSUM"] = ""
header["DATASUM"] = ""
meta = NormalizedMetadata.from_fits_header(header)
if include_provenance:
if isinstance(hdul[-1], fits.hdu.BinTableHDU):
meta._provenance = hdul[-1].data["provenance"] # noqa: SLF001
else:
msg = "Provenance HDU does not appear to be BinTableHDU type."
raise ValueError(msg)
if "CPDIS1A" in header:
# Work around a possible astropy bug, see https://github.com/astropy/astropy/issues/18914
del header["CPDIS1A"]
del header["CPDIS2A"]
del header["DP1A"]
del header["DP2A"]
wcs = WCS(header, hdul, key=key)
unit = u.ct
if include_uncertainty and len(hdul) >= 3 and isinstance(hdul[2], fits.hdu.CompImageHDU):
secondary_hdu = hdul[2]
uncertainty = _unpack_uncertainty(secondary_hdu.data.astype(float), data).astype(dtype)
mask = np.isinf(uncertainty)
uncertainty = StdDevUncertainty(uncertainty)
else:
uncertainty = None
mask = None
return NDCube(
data.view(dtype=data.dtype.newbyteorder()).byteswap().astype(dtype),
wcs=wcs,
uncertainty=uncertainty,
meta=meta,
unit=unit,
mask=mask,
)
[docs]
def _load_many_cubes_caller(path: str | Path, kwargs: dict, allow_errors: bool) -> NDCube | str:
try:
return load_ndcube_from_fits(path, **kwargs)
except:
if allow_errors:
return traceback.format_exc()
raise
[docs]
def load_many_cubes_iterable(paths: list[str | Path], n_workers: int | None = None, allow_errors: bool = False,
**kwargs: dict) -> list[NDCube | str]:
"""
Load many NDCubes in parallel.
Does not fork so as to be Prefect-compatible.
When used as an iterator, cubes are yielded as they are loaded, allowing e.g. progress messages to be printed
Parameters
----------
paths
File paths to load.
n_workers
Number of parallel workers to use. A large number may overwhelm the main thread (which has to receive each
loaded cube), limiting the speed benefits of using many workers.
allow_errors
If True, if an exception is raised when loading a cube, the traceback is yielded rather than an NDCube. If
False, exceptions are raised in the normal way.
kwargs
Extra args are passed to `load_NDCube_from_fits`
"""
context = mp.get_context("forkserver")
with ProcessPoolExecutor(n_workers, mp_context=context) as p:
yield from p.map(_load_many_cubes_caller, paths, repeat(kwargs), repeat(allow_errors))
[docs]
def load_many_cubes(paths: list[str | Path], n_workers: int | None = None, allow_errors: bool = False,
**kwargs: dict) -> list[NDCube | str]:
"""
Load many NDCubes in parallel.
Does not fork so as to be Prefect-compatible.
Parameters
----------
paths
File paths to load.
n_workers
Number of parallel workers to use. A large number may overwhelm the main thread (which has to receive each
loaded cube), limiting the speed benefits of using many workers.
allow_errors
If True, if an exception is raised when loading a cube, the traceback is yielded rather than an NDCube. If
False, exceptions are raised in the normal way.
kwargs
Extra args are passed to `load_NDCube_from_fits`
Returns
-------
A list of NDCubes (or traceback strings)
"""
return list(load_many_cubes_iterable(paths, n_workers, allow_errors, **kwargs))
[docs]
def check_outlier(cube: NDCube) -> bool:
"""Check the input data cube for outlier flagging."""
for flag in ["OUTLIER", "BADPKTS"]:
if flag not in cube.meta:
warnings.warn(f"Input cube does not contain {flag} keyword.")
elif cube.meta[flag].value == 1:
return True
return False