Source code for punchbowl.level3.flow
import os
from datetime import UTC, datetime
from ndcube import NDCube
from prefect import get_run_logger
from punchbowl.data.meta import NormalizedMetadata, set_spacecraft_location_to_earth
from punchbowl.level3.f_corona_model import subtract_f_corona_background_task
from punchbowl.level3.low_noise import create_low_noise_task
from punchbowl.level3.polarization import convert_polarization
from punchbowl.level3.stellar import subtract_starfield_background_task
from punchbowl.level3.velocity import plot_flow_map, track_velocity
from punchbowl.prefect import punch_flow
from punchbowl.util import load_image_task, output_image_task
[docs]
@punch_flow
def level3_PIM_flow(data_list: list[str] | list[NDCube], # noqa: N802
before_f_corona_model_path: str,
after_f_corona_model_path: str,
output_filename: str | None = None,
reference_time: datetime | None = None) -> list[NDCube]: # noqa: ARG001
"""Level 3 PIM/CIM flow."""
logger = get_run_logger()
logger.info("beginning level 3 PIM/CIM flow")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
new_type = "CIM" if data_list[0].meta["TYPECODE"].value == "CT" else "PIM"
data_list = [subtract_f_corona_background_task(d,
before_f_corona_model_path,
after_f_corona_model_path) for d in data_list]
output_meta = NormalizedMetadata.load_template(new_type, "3")
out_list = [NDCube(data=d.data, wcs=d.wcs, meta=output_meta) for d in data_list]
for o, d in zip(out_list, data_list, strict=True):
o.meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
o.meta["DATE-AVG"] = d.meta["DATE-AVG"].value
o.meta["DATE-OBS"] = d.meta["DATE-OBS"].value
o.meta["DATE-BEG"] = d.meta["DATE-BEG"].value
o.meta["DATE-END"] = d.meta["DATE-END"].value
o = set_spacecraft_location_to_earth(o) # noqa: PLW2901
logger.info("ending level 3 PIM/CIM flow")
if output_filename is not None:
output_image_task(out_list[0], output_filename)
return out_list
[docs]
@punch_flow
def level3_core_flow(data_list: list[str] | list[NDCube],
before_f_corona_model_path: str,
after_f_corona_model_path: str,
starfield_background_path: str | None,
output_filename: str | None = None) -> list[NDCube]:
"""Level 3 core flow."""
logger = get_run_logger()
logger.info("beginning level 3 core flow")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
data_list = [subtract_f_corona_background_task(d,
before_f_corona_model_path,
after_f_corona_model_path) for d in data_list]
data_list = [subtract_starfield_background_task(d, starfield_background_path) for d in data_list]
if data_list[0].meta["TYPECODE"].value == "PT":
data_list = [convert_polarization(d) for d in data_list]
logger.info("ending level 3 core flow")
if output_filename is not None:
output_image_task(data_list[0], output_filename)
return data_list
[docs]
@punch_flow
def generate_level3_low_noise_flow(data_list: list[str] | list[NDCube],
output_filename: str | None = None) -> list[NDCube]:
"""Generate low noise products."""
logger = get_run_logger()
logger.info("Generating low noise products")
data_list = [load_image_task(d) if isinstance(d, str) else d for d in data_list]
low_noise_image = create_low_noise_task(data_list)
if output_filename is not None:
output_image_task(low_noise_image, output_filename)
return [low_noise_image]
[docs]
@punch_flow
def generate_level3_velocity_flow(data_list: list[str],
output_filename: str | None = None) -> list[NDCube]:
"""Generate Level 3 velocity data product."""
logger = get_run_logger()
logger.info("Generating velocity data product")
velocity_data, plot_parameters = track_velocity(data_list)
if output_filename is not None:
output_image_task(velocity_data, output_filename)
plot_filename = f"{os.path.splitext(output_filename)[0]}.png"
plot_flow_map(plot_filename, **plot_parameters)
return [velocity_data]