Source code for punchbowl.level3.low_noise
from datetime import UTC, datetime
import astropy.units as u
import numpy as np
import solpolpy
from astropy.nddata import StdDevUncertainty
from dateutil.parser import parse as parse_datetime
from dateutil.parser import parse as parse_datetime_str
from ndcube import NDCollection, NDCube
from punchbowl.data.meta import NormalizedMetadata
from punchbowl.data.punch_io import check_outlier
from punchbowl.level2.merge import _merge_ndcubes
from punchbowl.prefect import punch_task
from punchbowl.util import average_datetime
KEYWORD_OMIT = ("COMMENT", "HISTORY", "", "NAXIS3", "OBSTYPE", "OBS-MODE", "OBSLAYR1", "OBSLAYR2", "OBSLAYR3")
[docs]
@punch_task
def create_low_noise_task(cubes: list[NDCube], reference_time: str | datetime | None = None) -> NDCube:
"""Create a low noise image from a set of inputs."""
cube_count = len(cubes)
cubes = [cube for cube in cubes if not check_outlier(cube)]
if isinstance(reference_time, str):
reference_time = parse_datetime_str(reference_time)
# TODO - Note to future self: be clever and use the outlier flag to excise bad data spatially.
reference_cube_index = len(cubes)//2 - 1
new_cube = _merge_ndcubes(cubes, reference_cube_index=reference_cube_index)
if new_cube.data.ndim == 3:
mzp_map = [-60, 0, 60]
layer_map = {"M": 0, "Z": 1, "P": 2}
mzp_collection = NDCollection(
[(k, NDCube(
data=new_cube.data[layer_map[k], ...],
wcs=new_cube.wcs[layer_map[k]],
meta={
"POLAR": mzp_map[layer_map[k]] * u.degree,
"POLAROFF": 0,
"POLARREF": "solar",
},
))
for k in ["M", "Z", "P"]],
aligned_axes=(0, 1),
)
bpb_collection = solpolpy.resolve(mzp_collection, "bp3")
new_code = cubes[0].meta.product_code[0] + "A" + cubes[0].meta.product_code[2]
new_meta = NormalizedMetadata.load_template(new_code, "3")
for k in cubes[0].meta.fits_keys:
if k not in KEYWORD_OMIT and k in new_meta:
new_meta[k] = cubes[reference_cube_index].meta[k].value
# If any input data are excluded, flag this as an outlier
if (cube_count != len(cubes)
or not all(cube.meta["HAS_WFI1"] for cube in cubes)
or not all(cube.meta["HAS_WFI2"] for cube in cubes)
or not all(cube.meta["HAS_WFI3"] for cube in cubes)):
new_meta["OUTLIER"] = 1
new_meta.provenance = [c.meta["FILENAME"] for c in cubes]
mean_date = average_datetime([cube.meta.datetime for cube in cubes])
date_obs = mean_date if reference_time is None else reference_time
times_beg = np.array([parse_datetime(cube.meta["DATE-BEG"].value).replace(tzinfo=UTC).timestamp()
for cube in cubes])
times_end = np.array([parse_datetime(cube.meta["DATE-END"].value).replace(tzinfo=UTC).timestamp()
for cube in cubes])
new_meta["TYPECODE"] = new_code[0:2]
new_meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
new_meta["DATE-OBS"] = date_obs.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
new_meta["DATE-AVG"] = mean_date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
new_meta["DATE-BEG"] = datetime.fromtimestamp(np.min(times_beg),
tz=UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
new_meta["DATE-END"] = datetime.fromtimestamp(np.max(times_end),
tz=UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
new_cube.meta = new_meta
if new_cube.data.ndim == 3:
# # TODO - Fully propagate uncertainty
new_uncertainty = np.copy(new_cube.uncertainty.array[0,...])
new_uncertainty = np.stack([new_uncertainty] * 3, axis=0)
return NDCube(data = np.stack([bpb_collection[k].data for k in ["B", "pB", "pBp"]]),
uncertainty=StdDevUncertainty(new_uncertainty),
wcs=new_cube.wcs,
meta=new_cube.meta)
return new_cube