Source code for punchbowl.levelq.flow

import os
from datetime import UTC, datetime

import numpy as np
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from ndcube import NDCube
from prefect import get_run_logger

from punchbowl.data import NormalizedMetadata, get_base_file_name
from punchbowl.data.meta import set_spacecraft_location_to_earth
from punchbowl.data.punch_io import load_many_cubes
from punchbowl.data.wcs import load_trefoil_wcs
from punchbowl.level2.merge import merge_many_polarized_task
from punchbowl.level2.preprocess import preprocess_trefoil_inputs
from punchbowl.level2.resample import find_central_pixel, reproject_many_flow
from punchbowl.level3.f_corona_model import subtract_f_corona_background_task
from punchbowl.level3.low_noise import create_low_noise_task
from punchbowl.levelq.pca import pca_filter
from punchbowl.prefect import punch_flow
from punchbowl.util import DataLoader, average_datetime, find_first_existing_file, load_image_task, output_image_task

ORDER_QP = ["QM1", "QZ1", "QP1",
            "QM2", "QZ2", "QP2",
            "QM3", "QZ3", "QP3"]

SPACECRAFT_OBSCODE = {"1": "WFI1",
                      "2": "WFI2",
                      "3": "WFI3",
                      "4": "NFI4",
                      "N": "NFI4"}

[docs] @punch_flow def levelq_QNN_core_flow(data_list: list[str] | list[NDCube], #noqa: N802 output_filename: list[str] | None = None, files_to_fit: list[str | NDCube | DataLoader] | None = None, data_root: str | None = None) -> list[NDCube]: """ Run the LQ QNN flow. This flow is designed to run on a batch of input CR4 images to facilitate more efficient PCA fitting. Parameters ---------- data_list : list[str | NDCube] The input images, either as paths or NDCubes output_filename : list[str] Optional output paths at which the QNN files should be written files_to_fit : list[str | NDCube | DataLoader] Additional files to use for the PCA fitting, but not to actually be filtered or output data_root : str The root directory which the paths in ``data_list`` are relative to Returns ------- output_cubes : list[NDCube] The QNN data cubes """ logger = get_run_logger() logger.info("beginning level quickPUNCH QNN core flow") logger.info(f"Got {len(data_list)} input files and {len(files_to_fit)} extra files for fitting") output_cubes = [] data_cubes = [input_file for input_file in data_list if isinstance(input_file, NDCube)] input_paths = [input_file for input_file in data_list if isinstance(input_file, str)] if data_root is not None: input_paths = [os.path.join(data_root, path) for path in input_paths] data_cubes += load_many_cubes(input_paths, n_workers=3) logger.info("Loaded images to be subtracted") pca_filter(data_cubes, files_to_fit) for i, data_cube in enumerate(data_cubes): data = data_cube.data uncertainty = data_cube.uncertainty.array isnan = np.isnan(data) uncertainty[isnan] = np.inf data[isnan] = 0 output_meta_nfi = NormalizedMetadata.load_template("QNN", "Q") output_cube = NDCube( data=data, uncertainty=StdDevUncertainty(uncertainty), wcs=data_cube.wcs, meta=output_meta_nfi, ) output_cube.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_cube.meta["DATE-AVG"] = data_cube.meta["DATE-AVG"].value output_cube.meta["DATE-OBS"] = data_cube.meta["DATE-OBS"].value output_cube.meta["DATE-BEG"] = data_cube.meta["DATE-BEG"].value output_cube.meta["DATE-END"] = data_cube.meta["DATE-END"].value output_cube.meta["FILEVRSN"] = data_cube.meta["FILEVRSN"].value set_spacecraft_location_to_earth(output_cube) output_cubes.append(output_cube) if output_filename is not None and i < len(output_filename) and output_filename[i] is not None: output_image_task(output_cube, output_filename[i]) logger.info("ending level quickPUNCH QNN core flow") return output_cubes
[docs] @punch_flow def levelq_CQM_core_flow(data_list: list[str] | list[NDCube], #noqa: N802, C901 output_filename: list[str] | None = None, trim_edges_px: int = 0, alphas_file: str | None = None, trefoil_wcs: WCS | None = None, trefoil_shape: tuple[int, int] | None = None, ) -> list[NDCube]: """Level quickPUNCH core flow.""" logger = get_run_logger() logger.info("beginning level quickPUNCH CQM core flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] if data_list: ordered_data_list: list[NDCube | None] = [None for _ in range(len(ORDER_QP))] for i, order_element in enumerate(ORDER_QP): for data_element in data_list: typecode = data_element.meta["TYPECODE"].value obscode = data_element.meta["OBSCODE"].value if typecode == order_element[:2] and obscode == order_element[2]: ordered_data_list[i] = data_element logger.info("Ordered files are " f"{[get_base_file_name(cube) if cube is not None else None for cube in ordered_data_list]}") quickpunch_mosaic_wcs, quickpunch_mosaic_shape = load_trefoil_wcs() if trefoil_wcs is not None: quickpunch_mosaic_wcs = trefoil_wcs if trefoil_shape is not None: quickpunch_mosaic_shape = trefoil_shape preprocess_trefoil_inputs(data_list, [None] * len(data_list), trim_edges_px, alphas_file) data_list_mosaic = reproject_many_flow(ordered_data_list, quickpunch_mosaic_wcs, quickpunch_mosaic_shape) output_dateobs = average_datetime( [d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_datebeg = min([d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_dateend = max([d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic = merge_many_polarized_task(data_list_mosaic, quickpunch_mosaic_wcs, level="Q", product_code="CQM") for d in filter(None, data_list_mosaic): spacecraft = SPACECRAFT_OBSCODE[d.meta["OBSCODE"].value] output_data_mosaic.meta[f"HAS_{spacecraft}"] = 1 output_data_mosaic.meta["ALL_INPT"] = {output_data_mosaic.meta["HAS_WFI1"], output_data_mosaic.meta["HAS_WFI2"], output_data_mosaic.meta["HAS_WFI3"], output_data_mosaic.meta["HAS_NFI4"]} == {1} centers = find_central_pixel(ordered_data_list, quickpunch_mosaic_wcs) for center, cube in zip(centers, ordered_data_list, strict=False): if center is None: continue cx, cy = center obs_no = cube.meta["OBSCODE"].value obs = "NFI" if obs_no == "N" else "WFI" obs_no = "4" if obs_no == "N" else obs_no output_data_mosaic.meta[f"CTRX{obs}{obs_no}"] = cx output_data_mosaic.meta[f"CTRY{obs}{obs_no}"] = cy output_data_mosaic.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic.meta["DATE-AVG"] = output_dateobs output_data_mosaic.meta["DATE-OBS"] = output_dateobs output_data_mosaic.meta["DATE-BEG"] = output_datebeg output_data_mosaic.meta["DATE-END"] = output_dateend output_data_mosaic.meta["FILEVRSN"] = find_first_existing_file(data_list).meta["FILEVRSN"].value output_data_mosaic = set_spacecraft_location_to_earth(output_data_mosaic) else: output_dateobs = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_datebeg = output_dateobs output_dateend = output_datebeg quickpunch_mosaic_wcs, quickpunch_mosaic_shape = load_trefoil_wcs() output_data_mosaic = NDCube( data=np.zeros(quickpunch_mosaic_shape), uncertainty=StdDevUncertainty(np.zeros(quickpunch_mosaic_shape)), wcs=quickpunch_mosaic_wcs, meta=NormalizedMetadata.load_template("CTM", "Q"), ) output_data_mosaic.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic.meta["DATE-AVG"] = output_dateobs output_data_mosaic.meta["DATE-OBS"] = output_dateobs output_data_mosaic.meta["DATE-BEG"] = output_datebeg output_data_mosaic.meta["DATE-END"] = output_dateend output_data_mosaic = set_spacecraft_location_to_earth(output_data_mosaic) if output_filename is not None: output_image_task(output_data_mosaic, output_filename[0]) logger.info("ending level quickPUNCH CQM core flow") return [output_data_mosaic]
[docs] @punch_flow def levelq_CTM_core_flow(data_list: list[str] | list[NDCube], # noqa: N802 before_f_corona_model_path: str, after_f_corona_model_path: str, output_filename: str | None = None, reference_time: datetime | None = None) -> list[NDCube]: # noqa: ARG001 """Level Q CTM flow.""" logger = get_run_logger() logger.info("beginning level Q CTM flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] for cube in data_list: # We'll want to grab the history we accumulate through this flow and put it in the final product, # but the per-file history up to now is kind of meaningless for the merged final product. if cube is not None: cube.meta.history.clear() data_list = [subtract_f_corona_background_task(d, before_f_corona_model_path, after_f_corona_model_path, allow_extrapolation=True) for d in data_list] out_list = [] for d in data_list: output_meta = NormalizedMetadata.load_template("CTM", "Q") o = NDCube(data=d.data, wcs=d.wcs, meta=output_meta, uncertainty=d.uncertainty) out_list.append(o) o.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] o.meta.history = d.meta.history o.meta["CALFCOR1"] = os.path.basename(before_f_corona_model_path) o.meta["CALFCOR2"] = os.path.basename(after_f_corona_model_path) for key in ["FILEVRSN", "ALL_INPT", "HAS_WFI1", "HAS_WFI2", "HAS_WFI3", "HAS_NFI4", "DATE-AVG", "DATE-OBS", "DATE-BEG", "DATE-END", "CTRXWFI1", "CTRYWFI1", "CTRXWFI2", "CTRYWFI2", "CTRXWFI3", "CTRYWFI3", "CTRXNFI4", "CTRYNFI4"]: o.meta[key] = d.meta[key].value set_spacecraft_location_to_earth(o) logger.info("ending level Q CTM flow") for o in out_list: o.meta.provenance = [fname for d in data_list if d is not None and (fname := d.meta.get("FILENAME").value)] if output_filename is not None: output_image_task(out_list[0], output_filename) return out_list
[docs] @punch_flow def levelq_QAM_core_flow(data_list: list[str] | list[NDCube], # noqa: N802 output_filename: str | None = None, reference_time: datetime | None = None) -> list[NDCube]: """Level Q QAM flow.""" logger = get_run_logger() logger.info("beginning level Q QAM flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] for cube in data_list: # We'll want to grab the history we accumulate through this flow and put it in the final product, # but the per-file history up to now is kind of meaningless for the merged final product. if cube is not None: cube.meta.history.clear() data_list = [create_low_noise_task(data_list, reference_time=reference_time)] out_list = [] for d in data_list: output_meta = NormalizedMetadata.load_template("QAM", "Q") o = NDCube(data=d.data, wcs=d.wcs, meta=output_meta, uncertainty=d.uncertainty) out_list.append(o) o.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] o.meta.history = d.meta.history for key in ["FILEVRSN", "ALL_INPT", "HAS_WFI1", "HAS_WFI2", "HAS_WFI3", "HAS_NFI4", "DATE-AVG", "DATE-OBS", "DATE-BEG", "DATE-END", "CTRXWFI1", "CTRYWFI1", "CTRXWFI2", "CTRYWFI2", "CTRXWFI3", "CTRYWFI3", "CTRXNFI4", "CTRYNFI4"]: o.meta[key] = d.meta[key].value set_spacecraft_location_to_earth(o) logger.info("ending level Q QAM flow") for o in out_list: o.meta.provenance = [fname for d in data_list if d is not None and (fname := d.meta.get("FILENAME").value)] if output_filename is not None: output_image_task(out_list[0], output_filename) return out_list