import warnings
from math import floor
from datetime import UTC, datetime
import astropy.units as u
import numpy as np
import remove_starfield
from astropy.io.fits import getheader
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from dateutil.parser import parse as parse_datetime_str
from ndcube import NDCollection, NDCube
from prefect import get_run_logger
from remove_starfield import ImageHolder, ImageProcessor, Starfield
from remove_starfield.reducers import GaussianReducer
from solpolpy import resolve
from solpolpy.util import solnorth_from_wcs
from punchbowl.data import NormalizedMetadata, load_ndcube_from_fits, write_ndcube_to_fits
from punchbowl.data.wcs import (
calculate_celestial_wcs_from_helio,
calculate_helio_wcs_from_celestial,
celestial_north_from_wcs,
)
from punchbowl.prefect import punch_flow, punch_task
from punchbowl.util import average_datetime
warnings.filterwarnings("ignore")
[docs]
def polarize_solar_to_celestial(input_data: NDCube) -> NDCube:
"""
Convert polarization from mzpsolar to Celestial frame.
All images need their polarization converted to Celestial frame
to generate the background starfield model.
"""
# Create a data collection for M, Z, P components
mzp_angles = [-60, 0, 60]*u.degree
ncols, nrows = input_data.data[0].shape
full_header = input_data.meta.to_fits_header(wcs=input_data.wcs,
write_celestial_wcs=not False)
wcs1 = WCS(full_header).dropaxis(2)
wcs2 = WCS(full_header, key="A").dropaxis(2)
# Converting polarization w.r.t. Celestial North
angle_solar_north = solnorth_from_wcs(wcs1, (nrows, ncols))
angle_celest_north = celestial_north_from_wcs(wcs2, (nrows, ncols))
zoff = (angle_celest_north.value - angle_solar_north.value) * u.degree
new_angles = np.stack([zoff - 60 * u.deg, zoff, zoff + 60 * u.deg])
collection_contents = [
(label,
NDCube(data=input_data[i].data,
wcs=wcs1,
meta={"POLAR": angle}))
for label, i, angle in zip(["M", "Z", "P"], [0, 1, 2], mzp_angles, strict=False)
]
data_collection = NDCollection(collection_contents, aligned_axes="all")
# Resolve data to celestial frame
celestial_data_collection = resolve(data_collection, "npol", out_angles=new_angles)
valid_keys = [key for key in celestial_data_collection if key != "alpha"]
new_data = [celestial_data_collection[key].data for key in valid_keys]
new_wcs = input_data.wcs.copy()
output_meta = NormalizedMetadata.load_template("PTM", "3")
output_meta["DATE-OBS"] = input_data.meta["DATE-OBS"].value
output = NDCube(data=new_data, wcs=new_wcs, meta=output_meta)
output.meta.history.add_now("LEVEL3-convert2celestial", "Convert mzpsolar to Celestial")
return output
[docs]
def polarize_celestial_to_solar(input_data: NDCube) -> NDCube:
"""
Convert polarization from Celestial frame to mzpsolar.
All images need their polarization converted back to Solar frame
after removing the stellar polarization.
"""
# Compute new angles for celestial frame
ncols, nrows = input_data.data[0].shape
full_header = input_data.meta.to_fits_header(wcs=input_data.wcs,
write_celestial_wcs=not False)
wcs1 = WCS(full_header).dropaxis(2)
wcs2 = WCS(full_header, key="A").dropaxis(2)
# Converting polarization w.r.t. Celestial North
angle_solar_north = solnorth_from_wcs(wcs1, (nrows, ncols))
angle_celest_north = celestial_north_from_wcs(wcs2, (nrows, ncols))
zoff = (angle_celest_north.value - angle_solar_north.value) * u.degree
new_angles = np.stack([zoff - 60 * u.deg, zoff, zoff + 60 * u.deg])
collection_contents = [
(f"{np.round(new_angles[i, nrows//2, ncols//2].value)} deg",
NDCube(data=input_data[i].data,
wcs=wcs1,
meta={"POLAR": angle}))
for i, angle in enumerate(new_angles)
]
data_collection = NDCollection(collection_contents, aligned_axes="all")
# Resolve data to mzpsolar frame
solar_data_collection = resolve(data_collection, "mzpsolar", in_angles=new_angles)
valid_keys = [key for key in solar_data_collection if key != "alpha"]
new_data = [solar_data_collection[key].data for key in valid_keys]
new_wcs = input_data.wcs.copy()
output_meta = NormalizedMetadata.load_template("PTM", "3")
output_meta["DATE-OBS"] = input_data.meta["DATE-OBS"].value
output = NDCube(data=new_data, wcs=new_wcs, meta=output_meta, uncertainty=input_data.uncertainty)
output.meta.history.add_now("LEVEL3-convert2mzpsolar", "Convert Celestial to mzpsolar")
return output
[docs]
class PUNCHImageProcessor(ImageProcessor):
"""Special loader for PUNCH data."""
def __init__(self, layer: int | None = None, apply_mask: bool = True, key: str = " ") -> None:
"""Create PUNCHImageProcessor."""
self.layer: int | None = layer
self.apply_mask = apply_mask
self.key = key
[docs]
def load_image(self, filename: str) -> ImageHolder:
"""Load an image."""
cube = load_ndcube_from_fits(filename, key=self.key)
if self.layer is None: # clear data
data = cube.data
else: # it's polarized
cube = polarize_solar_to_celestial(cube)
data = cube.data[self.layer]
if self.apply_mask:
data[data==0] = np.nan
return ImageHolder(data, cube.wcs.celestial, cube.meta)
[docs]
@punch_flow(log_prints=True, timeout_seconds=21_600)
def generate_starfield_background(
filenames: list[str],
map_scale: float = 0.01,
target_mem_usage: float = 1000,
n_procs: int | None = None,
reference_time: datetime | None = None,
is_polarized: bool = False,
out_file: str | None = None) -> NDCube | None :
"""Create a background starfield map from a series of PUNCH images over a long period of time."""
logger = get_run_logger()
if reference_time is None:
reference_time = datetime.now(UTC)
elif isinstance(reference_time, str):
reference_time = parse_datetime_str(reference_time)
logger.info("construct_starfield_background started")
# create an empty array to fill with data
# open the first file in the list to ge the shape of the file
if len(filenames) == 0:
msg = "filenames cannot be empty"
raise ValueError(msg)
shape = [floor(132 / map_scale), floor(360 / map_scale)]
starfield_wcs = WCS(naxis=2)
# n.b. it seems the RA wrap point is chosen so there's 180 degrees
# included on either side of crpix
starfield_wcs.wcs.crpix = [shape[1] / 2 + .5, shape[0] / 2 + .5]
starfield_wcs.wcs.crval = 270, -23.5
starfield_wcs.wcs.cdelt = map_scale, map_scale
starfield_wcs.wcs.ctype = "RA---CAR", "DEC--CAR"
starfield_wcs.wcs.cunit = "deg", "deg"
starfield_wcs.array_shape = shape
date_obses = [getheader(f, 1)["DATE-OBS"] for f in filenames]
times = [datetime.fromisoformat(d) for d in date_obses]
meta = NormalizedMetadata.load_template("PSM" if is_polarized else "CSM", "3")
meta["DATE-OBS"] = reference_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
meta["DATE-BEG"] = min(times).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
meta["DATE-END"] = max(times).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
meta["DATE-AVG"] = average_datetime(times).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
meta["DATE"] = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
if is_polarized:
logger.info("Starting m starfield")
starfield_m = remove_starfield.build_starfield_estimate(
filenames,
attribution=False,
frame_count=False,
reducer=GaussianReducer(),
starfield_wcs=starfield_wcs,
n_procs=n_procs,
processor=PUNCHImageProcessor(0, apply_mask=True, key="A"),
target_mem_usage=target_mem_usage)
logger.info("Ending m starfield")
logger.info("Starting z starfield")
starfield_z = remove_starfield.build_starfield_estimate(
filenames,
attribution=False,
frame_count=False,
reducer=GaussianReducer(),
starfield_wcs=starfield_wcs,
n_procs=n_procs,
processor=PUNCHImageProcessor(1, apply_mask=True, key="A"),
target_mem_usage=target_mem_usage)
logger.info("Ending z starfield")
logger.info("Starting p starfield")
starfield_p = remove_starfield.build_starfield_estimate(
filenames,
attribution=False,
frame_count=False,
reducer=GaussianReducer(),
starfield_wcs=starfield_wcs,
n_procs=n_procs,
processor=PUNCHImageProcessor(2, apply_mask=True, key="A"),
target_mem_usage=target_mem_usage)
logger.info("Ending p starfield")
out_data = np.stack([starfield_m.starfield, starfield_z.starfield, starfield_p.starfield], axis=0)
out_wcs = calculate_helio_wcs_from_celestial(starfield_m.wcs, meta.astropy_time, starfield_m.starfield.shape)
else:
logger.info("Starting clear starfield")
starfield_clear = remove_starfield.build_starfield_estimate(
filenames,
attribution=False,
frame_count=False,
reducer=GaussianReducer(),
starfield_wcs=starfield_wcs,
n_procs=n_procs,
processor=PUNCHImageProcessor(None, apply_mask=True, key="A"),
target_mem_usage=target_mem_usage)
logger.info("Ending clear starfield")
out_data = starfield_clear.starfield
out_wcs = calculate_helio_wcs_from_celestial(starfield_clear.wcs,
meta.astropy_time,
starfield_clear.starfield.shape)
# TODO - Replace uncertainty below with values folded through starfield estimation logic
output = NDCube(data=out_data, uncertainty=StdDevUncertainty(np.sqrt(out_data)), wcs=out_wcs, meta=meta)
output.meta.history.add_now("LEVEL3-starfield_background", "constructed starfield_bg model")
logger.info("construct_starfield_background finished")
if out_file is not None:
write_ndcube_to_fits(output, filename=out_file, write_hash=False, overwrite=True)
return None
return [output]
[docs]
@punch_task
def subtract_starfield_background_task(data_object: NDCube,
starfield_background_path: str | None,
is_polarized: bool = False) -> NDCube:
"""
Subtracts a background starfield from an input data frame.
checks the dimensions of input data frame and background starfield match and
subtracts the background starfield from the data frame of interest.
Parameters
----------
data_object : NDCube
A NDCube data frame to be background subtracted
starfield_background_path : str
path to a NDCube background starfield map
is_polarized : bool
whether the data is polarized
Returns
-------
NDCube
A background starfield subtracted data frame
"""
logger = get_run_logger()
logger.info("subtract_starfield_background started")
if starfield_background_path is None:
output = data_object
output.meta.history.add_now("LEVEL3-subtract_starfield_background",
"starfield subtraction skipped since path is empty")
else:
star_datacube = load_ndcube_from_fits(starfield_background_path)
wcs_celestial = calculate_celestial_wcs_from_helio(star_datacube.wcs)
wcs_celestial.wcs.cdelt[0] = wcs_celestial.wcs.cdelt[0] * -1
original_mask = data_object.data == 0
if is_polarized:
starfield_model_m = Starfield(np.stack((star_datacube.data[0], star_datacube.uncertainty.array[0])),
wcs_celestial[0])
subtracted_m = starfield_model_m.subtract_from_image(
NDCube(data=np.stack((data_object.data[0], data_object.uncertainty.array[0])),
wcs=data_object.wcs[0],
meta=data_object.meta),
processor=PUNCHImageProcessor(layer=0, key="A"))
starfield_model_z = Starfield(np.stack((star_datacube.data[1], star_datacube.uncertainty.array[1])),
wcs_celestial[1])
subtracted_z = starfield_model_z.subtract_from_image(
NDCube(data=np.stack((data_object.data[1], data_object.uncertainty.array[1])),
wcs=data_object.wcs[1],
meta=data_object.meta),
processor=PUNCHImageProcessor(layer=1, key="A"))
starfield_model_p = Starfield(np.stack((star_datacube.data[2], star_datacube.uncertainty.array[2])),
wcs_celestial[2])
subtracted_p = starfield_model_p.subtract_from_image(
NDCube(data=np.stack((data_object.data[2], data_object.uncertainty.array[2])),
wcs=data_object.wcs[2],
meta=data_object.meta),
processor=PUNCHImageProcessor(layer=2, key="A"))
data_object.data[...] = np.stack([subtracted_m.subtracted[0],
subtracted_z.subtracted[0],
subtracted_p.subtracted[0]], axis=0)
data_object.uncertainty.array[...] = np.sqrt(data_object.uncertainty.array**2 +
np.stack([subtracted_m.subtracted[1],
subtracted_z.subtracted[1],
subtracted_p.subtracted[1]], axis=0)**2)
else:
starfield_model = Starfield(np.stack((star_datacube.data, star_datacube.uncertainty.array)), wcs_celestial)
subtracted = starfield_model.subtract_from_image(
NDCube(data=np.stack((data_object.data, data_object.uncertainty.array)),
wcs=data_object.wcs,
meta=data_object.meta),
processor=PUNCHImageProcessor(key="A"))
data_object.data[...] = subtracted.subtracted[0]
data_object.uncertainty.array[...] = np.sqrt(data_object.uncertainty.array**2 +
subtracted.subtracted[1]**2)
# Reset the data to be zero in invalid regions
data_object.data[original_mask] = 0
data_object.data[~np.isfinite(data_object.data)] = 0
data_object.meta.history.add_now("LEVEL3-subtract_starfield_background", "subtracted starfield background")
output = polarize_celestial_to_solar(data_object) if is_polarized else data_object
logger.info("subtract_starfield_background finished")
return output
[docs]
def create_empty_starfield_background(data_object: NDCube) -> np.ndarray:
"""Create an empty starfield background map."""
return np.zeros_like(data_object.data)