from datetime import datetime
import astropy.units as u
import cv2 as cv
import matplotlib as mpl
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from ndcube import NDCube
from sunpy.sun import constants
from punchbowl.data import load_ndcube_from_fits
from punchbowl.data.meta import NormalizedMetadata
[docs]
def calc_ylims(ycen_band_rs: np.ndarray, r_band_width: float, arcsec_per_px: float) -> tuple[int, int]:
"""
Convert y-coordinates of lower and upper row of bands to array indices for slicing.
Parameters
----------
ycen_band_rs : np.ndarray
y-coordinates of center of band in solar radii
r_band_width : float
Half-width of each radial band in solar radii
arcsec_per_px : float
Radial pixel scale in arcsec/px in the polar-remapped images
Returns
-------
list
Lower and upper Numpy array indices of the radial band
"""
# Unless we have a cropped image, bottom axis of the polar transform should be at 0 Rs.
origin_rs = 0
origin_arcsec = origin_rs * arcsec_per_px
ycen_band_arcsec = ycen_band_rs * constants.average_angular_size.value # center of the radial band in arcsec
rband_width_arcsec = r_band_width * constants.average_angular_size.value # width of the radial band in arcsec
ylo_band_idx = ((ycen_band_arcsec - rband_width_arcsec) - origin_arcsec) / arcsec_per_px # lower index of the band
yhi_band_idx = ((ycen_band_arcsec + rband_width_arcsec) - origin_arcsec) / arcsec_per_px # upper index of the band
return [ylo_band_idx, yhi_band_idx]
[docs]
def preprocess_image(data: NDCube, max_radius_px: int, num_azimuth_bins: int, az_bin: int) -> np.ndarray:
"""
Normalize and preprocess FITS image by removing bad values and scaling.
Parameters
----------
data: NDCube
Input data NDCube
max_radius_px : int
Maximum radius to include for polar remapping
num_azimuth_bins : int
Number of azimuthal samples to use in polar remapping
az_bin: int
Binning factor for binning the polar remapped image over the azimuth. The binning rule is currently numpy.mean()
Returns
-------
np.ndarray, dict
- Preprocecess polar-remapped image
- associated metadata
"""
# Replace with appropriate preprocessing needed to clean-up. We need to have finite values for the polar remap
image = data.data[0,...]
header = data.meta.to_fits_header(wcs=data.wcs)
image[~np.isfinite(image)] = 0
polar_image = cv.warpPolar(image.astype(np.float64), [int(max_radius_px), int(num_azimuth_bins)],
[header["CRPIX1"], header["CRPIX2"]], max_radius_px, cv.INTER_CUBIC)
polar_image_binned = polar_image.T.reshape([polar_image.shape[1],
polar_image.shape[0] // az_bin, az_bin]).mean(axis=2)
# Remove background radially by taking the mean over the radial axis (From Craig's original implementation)
polar_image_binned_radial_bkg = polar_image_binned - np.mean(polar_image_binned, axis=0)
# Flat-fielding further: dividing by RMS value along the radial axis
ff_rms = np.sqrt(np.mean(polar_image_binned_radial_bkg ** 2, axis=0))
processed_image = polar_image_binned_radial_bkg / ff_rms
# Clean-up, as divide by zero will occur
processed_image[~np.isfinite(processed_image)] = 0
polar_header = {
"NAXIS": 2,
"CTYPE1": "HPLN-CAR",
"NAXIS1": processed_image.shape[1],
"CDELT1": 360 / processed_image.shape[1],
"CUNIT1": "deg",
"CRPIX1": 0.5,
"CRVAL1": 0,
"CTYPE2": "HPLT-CAR",
"NAXIS2": processed_image.shape[0],
"CDELT2": 45 * 3600 / max_radius_px,
"CUNIT2": "arcsec",
"CRPIX2": processed_image.shape[0]//2 + 0.5,
"CRVAL2": (processed_image.shape[0]//2 + 0.5) * (45 * 3600 / max_radius_px),
"DATE-OBS": header["DATE-OBS"],
}
return processed_image, polar_header
[docs]
def calculate_cross_correlation(image1: np.ndarray, image2: np.ndarray,
offsets: np.ndarray, delta_px: int,
central_offset: int) -> np.ndarray:
"""
Perform cross-correlation for a range of offsets.
Parameters
----------
image1 : np.ndarray
First image array for correlation
image2 : np.ndarray
Second, time-offset image array for correlation
offsets : np.ndarray
Array of pixel offsets to iterate over for cross-correlation
delta_px : int
Pixel offset increment between samples
central_offset : int
Central offset from which to start correlation
Returns
-------
np.ndarray
Accumulated cross-correlation array over all offsets
"""
# Initialize accumulator array
acc = np.zeros((len(offsets), image1.shape[0], image1.shape[1]), dtype=float)
for jj, offset_index in enumerate(offsets):
# The two images need to be shifted from each other at each iteration.
# We first calculate the overall shift, then divide it in two parts, each part being used to shift each image
this_of = int(delta_px * (offset_index - (len(offsets) - 1) / 2)) + central_offset
# Amount of shift for image
offset_1 = int(this_of / 2)
offset_2 = int(this_of) - offset_1
# Padding the images symmetrically according to the direction of the shift. The padding rule is to replicate the
# values at the nearest edge
if offset_1 < 0:
padded_image1 = np.pad(image1, ((0, -offset_1), (0, 0)), mode="edge")[abs(offset_1):image1.shape[0] +
abs(offset_1), :]
else:
padded_image1 = np.pad(image1, ((offset_1, 0), (0, 0)), mode="edge")[:image1.shape[0], :]
if offset_2 < 0:
padded_image2 = np.pad(image2, ((-offset_2, 0), (0, 0)), mode="edge")[:image2.shape[0], :]
else:
padded_image2 = np.pad(image2, ((0, offset_2), (0, 0)), mode="edge")[offset_2:image2.shape[0] + offset_2, :]
acc[jj, :, :] += padded_image1 * padded_image2
return acc
[docs]
def accumulate_cross_correlation_across_frames(files: list, delta_t: int, sparsity: int, n_ofs: int,
max_radius_deg: int, num_azimuth_bins: int, az_bin: int,
delta_px: int, central_offset: int) -> np.ndarray:
"""
Accumulate cross-correlation across frames in a list of FITS files.
Parameters
----------
files : list
List of file paths to FITS files
delta_t : int
Frame offset (in frames) between time-offset image pairs
sparsity : int
Interval between frames to skip when accumulating cross-correlation
n_ofs : int
Number of pixel offsets to use in cross-correlation
max_radius_deg : int
Maximum radius in degrees to include for polar remapping
num_azimuth_bins : int
Number of azimuthal samples to use in polar remapping
az_bin: int
Binning factor for binning the polar remapped image over the azimuth. The binning rule is currently numpy.mean()
delta_px : int
Pixel offset increment between samples
central_offset : int
Central offset from which to start correlation
Returns
-------
np.ndarray
Accumulated cross-correlation array over all frames and offsets
"""
data = load_ndcube_from_fits(files[0])
header = data.meta.to_fits_header(wcs=data.wcs)
max_radius_px = max_radius_deg / header["CDELT1"]
polar_sample, _ = preprocess_image(data, max_radius_px, num_azimuth_bins, az_bin)
acc = np.zeros((n_ofs, polar_sample.shape[0], polar_sample.shape[1]), dtype=float)
n = 0
for i in range(0, len(files) - delta_t, delta_t * sparsity):
data1 = load_ndcube_from_fits(files[i])
data2 = load_ndcube_from_fits(files[i + delta_t])
prepped_image1, _ = preprocess_image(data1, max_radius_px, num_azimuth_bins, az_bin)
prepped_image2, _ = preprocess_image(data2, max_radius_px, num_azimuth_bins, az_bin)
acc += calculate_cross_correlation(prepped_image1, prepped_image2, np.arange(n_ofs), delta_px, central_offset)
n += 1
acc /= n
return acc
[docs]
def compute_all_bands(acc: np.ndarray, ycen_band_rs: np.ndarray, r_band_half_width: float,
arcsec_per_px: float, velocity_azimuth_bins: int,
x_kps: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
Compute speed and sigma for all radial bands.
Parameters
----------
acc : np.ndarray
Cross-correlation array accumulated across frames
ycen_band_rs : np.ndarray
y-coordinates of band centers in solar radii
r_band_half_width : float
Half-width of each radial band in solar radii
arcsec_per_px : float
Radial pixel scale in arcsec/px in the polar-remapped images
velocity_azimuth_bins : int
Number of azimuthal bins in the output flow maps
x_kps : np.ndarray
Array mapping pixel offsets to speed in km/s
Returns
-------
tuple
Tuple containing:
- np.ndarray : Average speed per angular bin for each radial band
- np.ndarray : Sigma (standard deviation) of speed per angular bin for each radial band
"""
ylohi = calc_ylims(ycen_band_rs, r_band_half_width, arcsec_per_px)
# Determine spike location (index of the correlation peak) in the cross-correlation array
spike_location = np.where(x_kps < 0)[0].max() + 2
avg_speeds = []
sigmas = []
for (ylo, yhi) in zip(*ylohi, strict=False):
acc_k = acc[:, int(ylo):int(yhi) + 1, ...].mean(axis=1)
# The modulus must be zero
azimuth_bin_size = acc_k.shape[1] // velocity_azimuth_bins
avcor_rbins_theta = acc_k.reshape(acc_k.shape[0], azimuth_bin_size, velocity_azimuth_bins)
speedmax_idx_per_thbin = np.array(
[avcor_rbins_theta[spike_location:, :, i].argmax(axis=0) +
spike_location for i in range(velocity_azimuth_bins)])
speedmax_per_theta = x_kps[speedmax_idx_per_thbin]
avg_speeds.append(speedmax_per_theta.mean(axis=1))
sigmas.append(speedmax_per_theta.std(axis=1) / np.sqrt(azimuth_bin_size))
return np.array(avg_speeds), np.array(sigmas)
[docs]
def process_corr(files: list, arcsec_per_px:float, expected_kps_windspeed: float, delta_t: int, sparsity: int,
delta_px: int, ycens: np.ndarray, r_band_half_width: float, n_ofs: int, max_radius_deg: int,
num_azimuth_bins: int, az_bin: int, velocity_azimuth_bins: int) -> tuple[np.ndarray, np.ndarray]:
"""
Process the cross-correlation across frames in a list of FITS files with associated average speeds.
Parameters
----------
files : list
List of file paths to FITS files
arcsec_per_px: float
pixel scale in arcsec over the radial axis in the polar-remapped image
expected_kps_windspeed: float
Expected Wind Speed in km/s for narrowing the cross-correlation
delta_t : float
Time offset (in nb of frames) between for an image pair
sparsity : int
Interval between frames to skip when accumulating cross-correlation
delta_px : int
Pixel offset increment between samples
ycens: np.ndarray
y-coordinates of center of bands in solar radii
r_band_half_width : float
Half-width of each radial band in solar radii
n_ofs : int
Number of pixel offsets to use in cross-correlation
max_radius_deg : int
Maximum radius in degrees to include for polar remapping
num_azimuth_bins : int
Number of azimuthal samples to use in polar remapping
az_bin: int
Binning factor for binning the polar remapped image over the azimuth. The binning rule is currently numpy.mean()
velocity_azimuth_bins : int
Number of azimuthal bins in the output flow maps
Returns
-------
[np.ndarray, np.ndarray]
Average speed and 1-sigma uncertainty over radius and angular bins
"""
# Expected windspeed in pixels
time_cadence_sec = 4 * 60 # time cadence of punch images in seconds (4 minutes)
arcsec_km = (1*u.arcsec).to(u.rad).value * 1*u.au.to(u.km)
expected_px_windspeed = expected_kps_windspeed / (arcsec_per_px * arcsec_km ) * time_cadence_sec
# Central offset to start correlation from.
central_offset = int(delta_t * expected_px_windspeed)
# Calculate speed mapping for offsets in km/s
x_pix = delta_px * (np.arange(n_ofs) - (n_ofs - 1) / 2) + central_offset
x_kps = x_pix / central_offset * expected_kps_windspeed
# Accumulate cross-correlation across frames
acc = accumulate_cross_correlation_across_frames(files, delta_t, sparsity, n_ofs, max_radius_deg, num_azimuth_bins,
az_bin, delta_px, central_offset)
# Compute average speeds and sigma for each radial band and latitudinal bin
avg_speeds, sigmas = compute_all_bands(acc, ycens, r_band_half_width, arcsec_per_px, velocity_azimuth_bins, x_kps)
return avg_speeds, sigmas
[docs]
def plot_flow_map(filename: str | None, data: NDCube, cmap: str = "magma") -> None:
"""
Plot polar maps of the radial flows.
Parameters
----------
filename: str
Output plot filename. If None, the figure is not saved out.
data: NDCube
Flow tracking data NDCube
cmap : str, optional
Colormap for the plot (default is 'magma')
Returns
-------
fig
The generated Matplotlib Figure
"""
speeds = data.data
sigmas = data.uncertainty.array
ycen_band_rs = np.fromstring(data.meta["YCENS"].value[1:-1], dtype=float, sep=",")
rbands = np.fromstring(data.meta["RBANDS"].value[1:-1], dtype=int, sep=",")
velocity_azimuth_bins = data.meta["PLTBINS"].value
thetas = np.linspace(0, 2 * np.pi, velocity_azimuth_bins + 1)
plt.close("all")
fig = plt.figure(figsize=(20, 8))
vmin = speeds.min()
vmax = speeds.max()
norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax, clip=True)
mapper = cm.ScalarMappable(norm=norm, cmap=cmap)
for i, ridx in enumerate(rbands):
signal = np.append(speeds[ridx], speeds[ridx][0])
error = np.append(sigmas[ridx], sigmas[ridx][0])
ax = fig.add_subplot(1, len(rbands), i + 1, projection="polar")
ax.plot(thetas, signal, "k-")
ax.fill_between(thetas, signal - error, signal + error, alpha=0.3, color="gray")
colors = np.array([mapper.to_rgba(v) for v in signal])
for theta, value, err, color in zip(thetas, signal, error, colors, strict=False):
ax.plot(theta, value, "o", color=color, ms=4)
ax.errorbar(theta, value, yerr=err, lw=2, capsize=3, color=color)
ax.set_title(f"Altitude = {ycen_band_rs[ridx]} Rs")
ax.set_ylim(50, 1.05 * vmax)
ax.set_rlabel_position(270)
cbar_ax = fig.add_axes([0.11, 0.2, 0.8, 0.03])
plt.colorbar(mapper, cax=cbar_ax, orientation="horizontal").ax.set_xlabel("Speed (km/s)")
if filename is not None:
plt.savefig(filename)
return fig
[docs]
def track_velocity(files: list[str],
delta_t: int = 12,
sparsity: int = 2,
n_ofs: int = 151,
delta_px: int = 2,
expected_kps_windspeed: int = 300,
r_band_half_width: float = 0.5,
max_radius_deg: int = 45,
num_azimuth_bins: int = 1440*8,
az_bin: int = 4,
velocity_azimuth_bins: int = 36,
ycens: np.ndarray | None = None,
rbands: list[int] | None = None) -> NDCube:
"""
Generate velocity map using flow tracking.
Parameters
----------
files : list[str]
List of file paths for input data
delta_t : int, optional
Time offset in frames between images
sparsity : int, optional
Frame skip interval for averaging
n_ofs : int, optional
Number of spatial offsets for cross-correlation
delta_px : int, optional
Pixel offset increment per sample
expected_kps_windspeed : int, optional
Expected wind speed in km/s
r_band_half_width : float, optional
Half-width of each radial band in solar radii
max_radius_deg : int, optional
The maximum radius in degrees
num_azimuth_bins : int, optional
Number of azimuthal bins in the polar remapped images
az_bin : int, optional
Binning factor for binning the polar remapped image over the azimuth
velocity_azimuth_bins : int, optional
Number of azimuthal bins in the output flow maps
ycens : numpy.ndarray, optional
Radial band centers in solar radii
rbands : list[int], optional
Indices of radial bands to visualize
Returns
-------
ndcube.NDCube
The generated velocity map
"""
# Set defaults for missing input parameters
if ycens is None:
ycens = np.arange(7, 14.5, 0.5)
if rbands is None:
rbands = [0, 4, 8, 14]
files.sort()
if len(files) < 2:
msg = "At least to input files must be provided for flow tracking"
raise ValueError(msg)
# Data preprocessing
data0 = load_ndcube_from_fits(files[0])
header1 = data0.meta.to_fits_header(wcs=data0.wcs)
_, polar_header1 = preprocess_image(data0, max_radius_deg/header1["CDELT1"], num_azimuth_bins, az_bin)
avg_speeds, sigmas = process_corr(files, polar_header1["CDELT2"], expected_kps_windspeed,
delta_t, sparsity, delta_px, ycens, r_band_half_width, n_ofs,
max_radius_deg, num_azimuth_bins, az_bin, velocity_azimuth_bins)
output_meta = NormalizedMetadata.load_template("VAM", "3")
with fits.open(files[0]) as hdul:
output_meta["DATE-BEG"] = hdul[1].header["DATE-BEG"]
with fits.open(files[-1]) as hdul:
output_meta["DATE-END"] = hdul[1].header["DATE-END"]
date_beg = datetime.strptime(output_meta["DATE-BEG"].value, "%Y-%m-%dT%H:%M:%S").astimezone()
date_end = datetime.strptime(output_meta["DATE-END"].value, "%Y-%m-%dT%H:%M:%S").astimezone()
date_avg = (date_beg + (date_end - date_beg) / 2).strftime("%Y-%m-%dT%H:%M:%S")
output_meta["DATE-AVG"] = date_avg
output_meta["DATE-OBS"] = date_avg
output_meta["DELTAT"] = delta_t
output_meta["SPARSITY"] = sparsity
output_meta["N_OFS"] = n_ofs
output_meta["DELTA_PX"] = delta_px
output_meta["KPSEXP"] = expected_kps_windspeed
output_meta["BANDWDTH"] = r_band_half_width
output_meta["MAXRAD"] = max_radius_deg
output_meta["AZMBINS"] = num_azimuth_bins
output_meta["AZMBINF"] = az_bin
output_meta["PLTBINS"] = velocity_azimuth_bins
output_meta["YCENS"] = np.array2string(ycens, separator=",", max_line_width=10_000)
output_meta["RBANDS"] = np.array2string(np.array(rbands), separator=",", max_line_width=10_000)
wcs = WCS(naxis=2)
wcs.wcs.ctype = "radius","azimuth"
wcs.wcs.cunit = "solRad","rad"
wcs.wcs.cdelt = 1, 0.5
wcs.wcs.crpix = 0, 0
wcs.wcs.crval = 0, 0
wcs.wcs.cname = "solar radii", "azimuth"
wcs.array_shape = avg_speeds.shape
return NDCube(data = avg_speeds,
uncertainty=StdDevUncertainty(sigmas),
meta = output_meta,
wcs = wcs)