Source code for punchbowl.levelq.flow

import os
from datetime import UTC, datetime

import numpy as np
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS

from punchbowl.data import NormalizedMetadata, get_base_file_name
from punchbowl.data.meta import set_spacecraft_location_to_earth
from punchbowl.data.punch_io import load_many_cubes
from punchbowl.data.punchcube import PUNCHCube
from punchbowl.data.wcs import load_trefoil_wcs
from punchbowl.level2.merge import merge_many_polarized_task
from punchbowl.level2.preprocess import preprocess_trefoil_inputs
from punchbowl.level2.resample import find_central_pixel, reproject_many_flow
from punchbowl.level3.f_corona_model import subtract_f_corona_background_task
from punchbowl.level3.low_noise import create_low_noise_task
from punchbowl.levelq.pca import pca_filter
from punchbowl.prefect import get_logger, punch_flow
from punchbowl.util import DataLoader, average_datetime, find_first_existing_file, load_image_task, output_image_task

ORDER_QP = ["QM1", "QZ1", "QP1",
            "QM2", "QZ2", "QP2",
            "QM3", "QZ3", "QP3"]

SPACECRAFT_OBSCODE = {"1": "WFI1",
                      "2": "WFI2",
                      "3": "WFI3",
                      "4": "NFI4",
                      "N": "NFI4"}

[docs] @punch_flow def levelq_QNN_core_flow(data_list: list[str] | list[PUNCHCube], #noqa: N802 output_filename: list[str] | None = None, files_to_fit: list[str | PUNCHCube | DataLoader] | None = None, data_root: str | None = None) -> list[PUNCHCube]: """ Run the LQ QNN flow. This flow is designed to run on a batch of input CR4 images to facilitate more efficient PCA fitting. Parameters ---------- data_list : list[str | PUNCHCube] The input images, either as paths or PUNCHCubes output_filename : list[str] Optional output paths at which the QNN files should be written files_to_fit : list[str | PUNCHCube | DataLoader] Additional files to use for the PCA fitting, but not to actually be filtered or output data_root : str The root directory which the paths in ``data_list`` are relative to Returns ------- output_cubes : list[PUNCHCube] The QNN data cubes """ logger = get_logger() logger.info("beginning level quickPUNCH QNN core flow") logger.info(f"Got {len(data_list)} input files and {len(files_to_fit)} extra files for fitting") output_cubes = [] data_cubes = [input_file for input_file in data_list if isinstance(input_file, PUNCHCube)] input_paths = [input_file for input_file in data_list if isinstance(input_file, str)] if data_root is not None: input_paths = [os.path.join(data_root, path) for path in input_paths] data_cubes += load_many_cubes(input_paths, n_workers=3) logger.info("Loaded images to be subtracted") pca_filter(data_cubes, files_to_fit) for i, data_cube in enumerate(data_cubes): data = data_cube.data uncertainty = data_cube.uncertainty.array isnan = np.isnan(data) uncertainty[isnan] = np.inf data[isnan] = 0 output_meta_nfi = NormalizedMetadata.load_template("QNN", "Q") output_cube = data_cube.replace(data=data, uncertainty=StdDevUncertainty(uncertainty), meta=output_meta_nfi) output_cube.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_cube.meta["DATE-AVG"] = data_cube.meta["DATE-AVG"].value output_cube.meta["DATE-OBS"] = data_cube.meta["DATE-OBS"].value output_cube.meta["DATE-BEG"] = data_cube.meta["DATE-BEG"].value output_cube.meta["DATE-END"] = data_cube.meta["DATE-END"].value output_cube.meta["FILEVRSN"] = data_cube.meta["FILEVRSN"].value set_spacecraft_location_to_earth(output_cube) output_cubes.append(output_cube) if output_filename is not None and i < len(output_filename) and output_filename[i] is not None: output_image_task(output_cube, output_filename[i]) logger.info("ending level quickPUNCH QNN core flow") return output_cubes
[docs] @punch_flow def levelq_CQM_core_flow(data_list: list[str] | list[PUNCHCube], #noqa: N802, C901 output_filename: list[str] | None = None, trim_edges_px: int = 0, alphas_file: str | None = None, trefoil_wcs: WCS | None = None, trefoil_shape: tuple[int, int] | None = None, ) -> list[PUNCHCube]: """Level quickPUNCH core flow.""" logger = get_logger() logger.info("beginning level quickPUNCH CQM core flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] if data_list: ordered_data_list: list[PUNCHCube | None] = [None for _ in range(len(ORDER_QP))] for i, order_element in enumerate(ORDER_QP): for data_element in data_list: typecode = data_element.meta["TYPECODE"].value obscode = data_element.meta["OBSCODE"].value if typecode == order_element[:2] and obscode == order_element[2]: ordered_data_list[i] = data_element logger.info("Ordered files are " f"{[get_base_file_name(cube) if cube is not None else None for cube in ordered_data_list]}") quickpunch_mosaic_wcs, quickpunch_mosaic_shape = load_trefoil_wcs() if trefoil_wcs is not None: quickpunch_mosaic_wcs = trefoil_wcs if trefoil_shape is not None: quickpunch_mosaic_shape = trefoil_shape preprocess_trefoil_inputs(data_list, [None] * len(data_list), trim_edges_px, alphas_file) data_list_mosaic = reproject_many_flow(ordered_data_list, quickpunch_mosaic_wcs, quickpunch_mosaic_shape) output_dateobs = average_datetime( [d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_datebeg = min([d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_dateend = max([d.meta.datetime for d in data_list_mosaic if d is not None], ).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic = merge_many_polarized_task(data_list_mosaic, quickpunch_mosaic_wcs, level="Q", product_code="CQM") for d in filter(None, data_list_mosaic): spacecraft = SPACECRAFT_OBSCODE[d.meta["OBSCODE"].value] output_data_mosaic.meta[f"HAS_{spacecraft}"] = 1 output_data_mosaic.meta["ALL_INPT"] = {output_data_mosaic.meta["HAS_WFI1"], output_data_mosaic.meta["HAS_WFI2"], output_data_mosaic.meta["HAS_WFI3"], output_data_mosaic.meta["HAS_NFI4"]} == {1} centers = find_central_pixel(ordered_data_list, quickpunch_mosaic_wcs) for center, cube in zip(centers, ordered_data_list, strict=False): if center is None: continue cx, cy = center obs_no = cube.meta["OBSCODE"].value obs = "NFI" if obs_no == "N" else "WFI" obs_no = "4" if obs_no == "N" else obs_no output_data_mosaic.meta[f"CTRX{obs}{obs_no}"] = cx output_data_mosaic.meta[f"CTRY{obs}{obs_no}"] = cy output_data_mosaic.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic.meta["DATE-AVG"] = output_dateobs output_data_mosaic.meta["DATE-OBS"] = output_dateobs output_data_mosaic.meta["DATE-BEG"] = output_datebeg output_data_mosaic.meta["DATE-END"] = output_dateend output_data_mosaic.meta["FILEVRSN"] = find_first_existing_file(data_list).meta["FILEVRSN"].value output_data_mosaic = set_spacecraft_location_to_earth(output_data_mosaic) else: output_dateobs = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_datebeg = output_dateobs output_dateend = output_datebeg quickpunch_mosaic_wcs, quickpunch_mosaic_shape = load_trefoil_wcs() output_data_mosaic = PUNCHCube( data=np.zeros(quickpunch_mosaic_shape), uncertainty=StdDevUncertainty(np.zeros(quickpunch_mosaic_shape)), wcs=quickpunch_mosaic_wcs, meta=NormalizedMetadata.load_template("CTM", "Q"), ) output_data_mosaic.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] output_data_mosaic.meta["DATE-AVG"] = output_dateobs output_data_mosaic.meta["DATE-OBS"] = output_dateobs output_data_mosaic.meta["DATE-BEG"] = output_datebeg output_data_mosaic.meta["DATE-END"] = output_dateend output_data_mosaic = set_spacecraft_location_to_earth(output_data_mosaic) if output_filename is not None: output_image_task(output_data_mosaic, output_filename[0]) logger.info("ending level quickPUNCH CQM core flow") return [output_data_mosaic]
[docs] @punch_flow def levelq_CTM_core_flow(data_list: list[str] | list[PUNCHCube], # noqa: N802 before_f_corona_model_path: str, after_f_corona_model_path: str, output_filename: str | None = None, reference_time: datetime | None = None) -> list[PUNCHCube]: # noqa: ARG001 """Level Q CTM flow.""" logger = get_logger() logger.info("beginning level Q CTM flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] for cube in data_list: # We'll want to grab the history we accumulate through this flow and put it in the final product, # but the per-file history up to now is kind of meaningless for the merged final product. if cube is not None: cube.meta.history.clear() data_list = [subtract_f_corona_background_task(d, before_f_corona_model_path, after_f_corona_model_path, allow_extrapolation=True) for d in data_list] out_list = [] for d in data_list: output_meta = NormalizedMetadata.load_template("CTM", "Q") o = PUNCHCube(data=d.data, wcs=d.wcs, meta=output_meta, uncertainty=d.uncertainty) out_list.append(o) o.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] o.meta.history = d.meta.history o.meta["CALFCOR1"] = os.path.basename(before_f_corona_model_path) o.meta["CALFCOR2"] = os.path.basename(after_f_corona_model_path) for key in ["FILEVRSN", "ALL_INPT", "HAS_WFI1", "HAS_WFI2", "HAS_WFI3", "HAS_NFI4", "DATE-AVG", "DATE-OBS", "DATE-BEG", "DATE-END", "CTRXWFI1", "CTRYWFI1", "CTRXWFI2", "CTRYWFI2", "CTRXWFI3", "CTRYWFI3", "CTRXNFI4", "CTRYNFI4"]: o.meta[key] = d.meta[key].value set_spacecraft_location_to_earth(o) logger.info("ending level Q CTM flow") for o in out_list: o.meta.provenance = [fname for d in data_list if d is not None and (fname := d.meta.get("FILENAME").value)] if output_filename is not None: output_image_task(out_list[0], output_filename) return out_list
[docs] @punch_flow def levelq_QAM_core_flow(data_list: list[str] | list[PUNCHCube], # noqa: N802 output_filename: str | None = None, reference_time: datetime | None = None) -> list[PUNCHCube]: """Level Q QAM flow.""" logger = get_logger() logger.info("beginning level Q QAM flow") data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list] for cube in data_list: # We'll want to grab the history we accumulate through this flow and put it in the final product, # but the per-file history up to now is kind of meaningless for the merged final product. if cube is not None: cube.meta.history.clear() data_list = [create_low_noise_task(data_list, reference_time=reference_time)] out_list = [] for d in data_list: output_meta = NormalizedMetadata.load_template("QAM", "Q") o = PUNCHCube(data=d.data, wcs=d.wcs, meta=output_meta, uncertainty=d.uncertainty) out_list.append(o) o.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] o.meta.history = d.meta.history for key in ["FILEVRSN", "ALL_INPT", "HAS_WFI1", "HAS_WFI2", "HAS_WFI3", "HAS_NFI4", "DATE-AVG", "DATE-OBS", "DATE-BEG", "DATE-END", "CTRXWFI1", "CTRYWFI1", "CTRXWFI2", "CTRYWFI2", "CTRXWFI3", "CTRYWFI3", "CTRXNFI4", "CTRYNFI4"]: o.meta[key] = d.meta[key].value set_spacecraft_location_to_earth(o) logger.info("ending level Q QAM flow") for o in out_list: o.meta.provenance = [fname for d in data_list if d is not None and (fname := d.meta.get("FILENAME").value)] if output_filename is not None: output_image_task(out_list[0], output_filename) return out_list