import os
from copy import deepcopy
from datetime import UTC, datetime
import numpy as np
from ndcube import NDCube
from prefect import get_run_logger
from punchbowl.auto.control.cache_layer.loader_base_class import DataLoader
from punchbowl.data import load_ndcube_from_fits
from punchbowl.data.meta import MetaField, NormalizedMetadata, set_spacecraft_location_to_earth
from punchbowl.level2.finalize import finalize_output
from punchbowl.level2.merge import merge_many_clear_task, merge_many_polarized_task
from punchbowl.level3.f_corona_model import subtract_f_corona_background_task
from punchbowl.level3.low_noise import create_low_noise_task
from punchbowl.level3.polarization import convert_polarization
from punchbowl.level3.stellar import subtract_starfield_background_task
from punchbowl.level3.velocity import plot_flow_map, track_velocity
from punchbowl.prefect import punch_flow
from punchbowl.util import load_image_task, output_image_task
[docs]
@punch_flow
def level3_PIM_CIM_flow(data_list: list[str] | list[NDCube], # noqa: N802
before_f_corona_model_paths: list[str | DataLoader],
after_f_corona_model_paths: list[str | DataLoader],
output_filename: str | None = None) -> list[NDCube]:
"""Level 3 PIM/CIM flow."""
logger = get_run_logger()
logger.info("beginning level 3 PIM/CIM flow")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
for i, cube in enumerate(data_list):
if len(cube.shape) == 3:
data = np.full((cube.shape[0], cube.meta["FULYSIZE"].value, cube.meta["FULXSIZE"].value), np.nan)
else:
data = np.full((cube.meta["FULYSIZE"].value, cube.meta["FULXSIZE"].value), np.nan)
cropx = cube.meta["CROPX1"].value, cube.meta["CROPX2"].value
cropy = cube.meta["CROPY1"].value, cube.meta["CROPY2"].value
data[..., cropy[0]:cropy[1], cropx[0]:cropx[1]] = cube.data
uncertainty = np.full(data.shape, np.inf)
uncertainty[..., cropy[0]:cropy[1], cropx[0]:cropx[1]] = cube.uncertainty.array
new_cube = NDCube(data, meta=cube.meta, wcs=cube.wcs, uncertainty=uncertainty)
data_list[i] = new_cube
polarized = data_list[0].meta["TYPECODE"].value[1] != "R"
new_type = "PIM" if polarized else "CIM"
trefoil_wcs = data_list[0].wcs.celestial
before_f_corona_models = [load_ndcube_from_fits(path) if isinstance(path, str)
else path.load() for path in before_f_corona_model_paths]
after_f_corona_models = [load_ndcube_from_fits(path) if isinstance(path, str)
else path.load() for path in after_f_corona_model_paths]
data_list = [subtract_f_corona_background_task(d,
before_f_corona_models,
after_f_corona_models) for d in data_list]
if polarized:
merge_layers = []
# The merging code wants our layers separated out as individual cubes
for d in data_list:
if d is None:
continue
for i, angle in enumerate([-60, 0, 60]):
# The input cubes need to have "POLAR" set so it knows which layer is which
m = deepcopy(d.meta)
# The existing meta doesn't have a POLAR key. Hack: just grab a section and cram in the new value.
section = next(iter(m._contents.values())) # noqa: SLF001
section["POLAR"] = MetaField("POLAR", "", angle, int, True, True, 0)
merge_layers.append(NDCube(
d.data[i],
meta=m,
wcs=d.wcs,
uncertainty=d.uncertainty[i],
))
else:
merge_layers = data_list
merger = merge_many_polarized_task if polarized else merge_many_clear_task
output_data = merger(merge_layers, trefoil_wcs, level="3", product_code=new_type)
fcor_files = [c.meta["FILENAME"].value.replace(".fits", "") for c in before_f_corona_models + after_f_corona_models]
output_data.meta.history.add_now("LEVEL3-subtract_f_corona_background",
f"subtracted f corona background using {', '.join(fcor_files)}")
finalize_output(output_data, data_list)
for cube in data_list:
obs_no = cube.meta["OBSCODE"].value
obs = "NFI" if obs_no == "4" else "WFI"
if cube.meta[f"CTRX{obs}{obs_no}"].value > 0:
output_data[0].meta[f"CTRX{obs}{obs_no}"] = cube.meta[f"CTRX{obs}{obs_no}"].value
output_data[0].meta[f"CTRY{obs}{obs_no}"] = cube.meta[f"CTRY{obs}{obs_no}"].value
logger.info("ending level 3 PIM/CIM flow")
if output_filename is not None:
output_image_task(output_data, output_filename)
return [output_data]
[docs]
@punch_flow
def level3_core_flow(data_list: list[str] | list[NDCube],
starfield_background_path: str | None,
output_filename: str | None = None) -> list[NDCube]:
"""Level 3 CTM flow."""
logger = get_run_logger()
logger.info("beginning level 3 CTM flow")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
is_polarized = data_list[0].meta["TYPECODE"].value == "PT"
data_list = [subtract_starfield_background_task(d, starfield_background_path) for d in data_list]
if is_polarized:
data_list = [convert_polarization(d) for d in data_list]
out_data_list = []
for o in data_list:
out_meta: NormalizedMetadata = NormalizedMetadata.load_template("PTM" if is_polarized else "CTM", "3")
out_meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
out_meta.provenance = [fname for d in data_list if d is not None and (fname := d.meta.get("FILENAME").value)]
out_meta.history = o.meta.history
out_meta["CALSTAR1"] = starfield_background_path
for key in ["FILEVRSN", "ALL_INPT", "HAS_WFI1", "HAS_WFI2", "HAS_WFI3", "HAS_NFI4", "DATE-AVG", "DATE-OBS",
"DATE-BEG", "DATE-END", "CTRXWFI1", "CTRYWFI1", "CTRXWFI2", "CTRYWFI2", "CTRXWFI3", "CTRYWFI3",
"CTRXNFI4", "CTRYNFI4"]:
out_meta[key] = o.meta[key].value
output_data = NDCube(
data=o.data,
uncertainty=o.uncertainty,
wcs=o.wcs,
meta=out_meta,
)
output_data = set_spacecraft_location_to_earth(output_data)
out_data_list.append(output_data)
if output_filename is not None:
output_image_task(out_data_list[0], output_filename)
logger.info("ending level 3 CTM core flow")
return out_data_list
[docs]
@punch_flow
def generate_level3_low_noise_flow(data_list: list[str] | list[NDCube],
output_filename: str | None = None,
reference_time: str | datetime | None = None) -> list[NDCube]:
"""Generate low noise products."""
logger = get_run_logger()
logger.info("Generating low noise products")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
low_noise_image = create_low_noise_task(data_list, reference_time=reference_time)
if output_filename is not None:
output_image_task(low_noise_image, output_filename)
return [low_noise_image]
[docs]
@punch_flow
def generate_level3_velocity_flow(data_list: list[str],
output_filename: str | None = None) -> list[NDCube]:
"""Generate Level 3 velocity data product."""
logger = get_run_logger()
logger.info("Generating velocity data product")
velocity_data, plot_parameters = track_velocity(data_list)
if output_filename is not None:
output_image_task(velocity_data, output_filename)
plot_filename = f"{os.path.splitext(output_filename)[0]}.png"
plot_flow_map(plot_filename, **plot_parameters)
return [velocity_data]