diff --git a/pyproject.toml b/pyproject.toml index 0df80b8..f327440 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,12 @@ description = "SolarMonitor Active Regtion Tracking (SMART)" requires-python = ">=3.9" readme = { file = "README.rst", content-type = "text/x-rst" } license = { file = "licenses/LICENSE.rst", content-type = "text/plain" } -dependencies = [] +dependencies = [ + "numpy", + "sunpy", + "scikit-image", + "matplotlib" +] dynamic = ["version"] [project.optional-dependencies] diff --git a/smart/characterization/PSL.py b/smart/characterization/PSL.py new file mode 100644 index 0000000..2a6abf4 --- /dev/null +++ b/smart/characterization/PSL.py @@ -0,0 +1,34 @@ +import numpy as np +import skimage as ski +from skimage.morphology import disk + +import astropy.units as u + + +@u.quantity_input +def separate_polarities(im_map, feature_map, dilation_radius: u.Quantity[u.arcsec] = 2 * u.arcsec): + """ + Separate feature masks into a negative mask and positive mask, and dilating both masks. + + Parameters + ---------- + im_map : Map + Processed SunPy magnetogram map. + feature_map : + """ + posmask = (feature_map > 0).astype(int) + negmask = (feature_map < 0).astype(int) + + arcsec_to_pixel = ((im_map.scale[0] + im_map.scale[1]) / 2) ** (-1) + dilation_radius = (np.round(dilation_radius * arcsec_to_pixel)).to_value(u.pix) + + dilated_posmask = ski.morphology.binary_dilation(posmask, disk(dilation_radius)) + dilated_negmask = ski.morphology.binary_dilation(negmask, disk(dilation_radius)) + return dilated_posmask, dilated_negmask + + +def PSL_length(dilated_posmask, dilated_negmask): + PSL_mask = ((dilated_posmask + dilated_negmask) > 1).astype(int) + PSL_thinmask = ski.morphology.thin(PSL_mask) + PSL_length = np.sum(PSL_thinmask) + return PSL_length diff --git a/smart/segmentation/IGM.py b/smart/segmentation/IGM.py new file mode 100644 index 0000000..c35d172 --- /dev/null +++ b/smart/segmentation/IGM.py @@ -0,0 +1,117 @@ +import matplotlib.pyplot as plt +import numpy as np +import skimage as ski +from skimage.morphology import disk + +import astropy.units as u + +from sunpy.map import Map + +from smart.segmentation.map_processing import smooth_los_threshold + + +def index_and_grow_mask( + current_map: Map, previous_map: Map, dilation_radius: u.Quantity[u.arcsec] = 2.5 * u.arcsec +): + """ + Performing indexing and growing of the Mask. + + Transient features are removed by comparing the mask at time 't' and the mask differentially + rotated to time 't'. ARs are then assigned ascending integer values (starting from one) in + order of decreasing size. + + Parameters + ---------- + current_map : Map + Processed magnetogram map from time 't'. + previous_map : Map + Processed magnetogtam map from time 't - delta_t' differentially rotated to time t. + dilation_radius : int, optional + Radius of the disk for binary dilation (default is 2.5 arcsecs). + + Returns + ------- + sorted_labels : numpy.ndarray + Individual contiguous features are indexed by assigning ascending integer + values (beginning with one) in order of decreasing feature size. + + """ + pixel_size = (current_map.scale[0] + current_map.scale[1]) / 2 + dilation_radius = (np.round(dilation_radius / pixel_size)).to_value(u.pix) + + filtered_labels = smooth_los_threshold(current_map)[1] + filtered_labels_dt = smooth_los_threshold(previous_map)[1] + + dilated_mask = ski.morphology.binary_dilation(filtered_labels, disk(dilation_radius)) + dilated_mask_dt = ski.morphology.binary_dilation(filtered_labels_dt, disk(dilation_radius)) + + transient_features = dilated_mask_dt & ~dilated_mask + final_mask = dilated_mask & ~transient_features + final_labels = ski.measure.label(final_mask) + + regions = ski.measure.regionprops(final_labels) + region_sizes = [(region.label, region.area) for region in regions] + + sorted_region_sizes = sorted(region_sizes, key=lambda x: x[1], reverse=True) + sorted_labels = np.zeros_like(final_labels) + for new_label, (old_label, _) in enumerate(sorted_region_sizes, start=1): + sorted_labels[final_labels == old_label] = new_label + + return sorted_labels + + +def plot_indexed_grown_mask(im_map: Map, sorted_labels, contours=True, labels=True, figtext=True): + """ + Plotting the fully processed and segmented magnetogram with labels and AR contours optionally displayed. + + Parameters + ---------- + im_map : Map + Processed magnetogram map from time 't'. + diff_map : Map + Processed magnetogtam map from time 't - delta_t' differentially rotated to time t. + contours : bool, optional + If True, contours of the detected regions displayed on map (default is True). + labels : bool, optional + If True, labels with the region numbers will be overlaid on the regions (default is True). + figtext : bool, optional + If True, figtext with the total number of detected regions is displayed on the map (default is True). + + Returns + ------- + None. + + """ + + fig = plt.figure() + ax = fig.add_subplot(projection=im_map) + im_map.plot(axes=ax) + + unique_labels = np.unique(sorted_labels) + unique_labels = unique_labels[unique_labels != 0] + + if contours: + for label_value in unique_labels: + region_mask = sorted_labels == label_value + ax.contour(region_mask, colors="purple", linewidths=1) + + if labels: + for label_value in unique_labels: + region_mask = sorted_labels == label_value + region = ski.measure.regionprops(region_mask.astype(int))[0] + centroid = region.centroid + ax.text( + centroid[1], + centroid[0], + str(label_value), + color="red", + fontsize=12, + weight="bold", + ha="center", + va="center", + ) + + if figtext: + plt.figtext(0.47, 0.2, f"Number of regions = {len(unique_labels)}", color="white") + + plt.show() diff --git a/smart/segmentation/differential_rotation.py b/smart/segmentation/differential_rotation.py new file mode 100644 index 0000000..01bec52 --- /dev/null +++ b/smart/segmentation/differential_rotation.py @@ -0,0 +1,27 @@ +from sunpy.coordinates import propagate_with_solar_surface +from sunpy.map import Map + + +def diff_rotation(im_map: Map, delta_im_map: Map): + """ + Performing differential rotation on a map in order to to correct for feature + motions due to solar rotation. + + Parameters + ---------- + im_map : Map + Processed SunPy magnetogram map. + delta_im_map : Map + Processed SunPy magnetogram taken at time Δt before im_map. + + Returns + ------- + diff_map : Map + delta_im_map differentially rotated to match im_map. + + """ + + with propagate_with_solar_surface(): + diff_map = delta_im_map.reproject_to(im_map.wcs) + + return diff_map, delta_im_map.date diff --git a/smart/segmentation/map_processing.py b/smart/segmentation/map_processing.py new file mode 100644 index 0000000..810fb0c --- /dev/null +++ b/smart/segmentation/map_processing.py @@ -0,0 +1,160 @@ +import numpy as np +import skimage as ski +from matplotlib import colors +from skimage.morphology import disk + +import astropy.units as u + +from sunpy.map import Map, all_coordinates_from_map, coordinate_is_on_solar_disk + + +def map_threshold(im_map): + """ + Creating a map from a fits file and processing said map. + + Parameters + ---------- + file : Map + Unprocessed magnetogram map. + + Returns + ------- + im_map : Map + Processed magnetogram map. + + """ + im_map.data[~coordinate_is_on_solar_disk(all_coordinates_from_map(im_map))] = np.nan + im_map.cmap.set_bad("k") + im_map.plot_settings["norm"] = colors.Normalize(vmin=-200, vmax=200) + return im_map + + +@u.quantity_input +def smooth_los_threshold( + im_map: Map, + thresh: u.Quantity[u.Gauss] = 100 * u.Gauss, + dilation_radius: u.Quantity[u.arcsec] = 2 * u.arcsec, + sigma: u.Quantity[u.arcsec] = 10 * u.arcsec, + min_size: u.Quantity[u.arcsec] = 2250 * u.arcsec, +): + """ + + We apply Smoothing, a noise Threshold, and an LOS correction, respectively, to the data. + + Parameters + ---------- + im_map : Map + Processed SunPy magnetogram map. + thresh : int, optional + Threshold value to identify regions of interest (default is 100 Gauss). + dilation_radius : int, optional + Radius of the disk for binary dilation (default is 2 arcsecs). + sigma : int, optional + Standard deviation for Gaussian smoothing (default is 10 arcsecs). + min_size : int, optional + Minimum size of regions to keep in final mask (default is 2250 arcsecs**2). + + Returns + ------- + smooth_map : Map + Map after applying Gaussian smoothing. + filtered_labels : numpy.ndarray + 2D array with each pixel labelled. + mask_sizes : numpy.ndarray + Boolean array indicating the sizes of each labeled region. + + """ + + pixel_size = (im_map.scale[0] + im_map.scale[1]) / 2 + dilation_radius = (np.round(dilation_radius / pixel_size)).to_value(u.pix) + sigma = (np.round(sigma / pixel_size)).to_value(u.pix) + min_size = (np.round(min_size / pixel_size)).to_value(u.pix) + + cosmap_data = cosine_correction(im_map) + + negmask = cosmap_data < -thresh + posmask = cosmap_data > thresh + mask = negmask | posmask + + dilated_mask = ski.morphology.binary_dilation(mask, disk(dilation_radius)) + smoothed_data = ski.filters.gaussian(np.nan_to_num(cosmap_data) * ~dilated_mask, sigma) + smooth_map = Map(smoothed_data, im_map.meta) + + labels = ski.measure.label(dilated_mask) + label_sizes = np.bincount(labels.ravel()) + mask_sizes = label_sizes > min_size + mask_sizes[0] = 0 + filtered_labels = mask_sizes[labels] + return smooth_map, filtered_labels, mask_sizes + + +def get_cosine_correction(im_map: Map): + """ + Get the cosine map and off-limb pixel map using WCS. + + Parameters + ---------- + im_map : Map + Processed SunPy magnetogram map. + + Returns + ------- + cos_cor : numpy.ndarray + Array of cosine correction factors for each pixel. Values greater than a threshold (edge) are set to 1. + d_angular : numpy.ndarray + Array of angular distances from the disk center in radians. + off_limb : numpy.ndarray + Binary array where pixels on the disk are 1 and pixels off the disk are 0. + """ + + # Take off an extra percent from the disk to get rid of limb effects + edge = 0.99 + + coordinates = all_coordinates_from_map(im_map) + x = coordinates.Tx.to(u.arcsec) + y = coordinates.Ty.to(u.arcsec) + d_radial = np.sqrt(x**2 + y**2) + + cos_cor_ratio = d_radial / im_map.rsun_obs + cos_cor_ratio = np.clip(cos_cor_ratio, -1, 1) + d_angular = np.arcsin(cos_cor_ratio) + cos_cor = 1 / np.cos(d_angular) + + off_disk = d_radial > (im_map.rsun_obs * edge) + cos_cor[off_disk] = 1 + + off_limb = np.zeros_like(d_radial.value) + off_limb[off_disk] = 0 + off_limb[~off_disk] = 1 + return cos_cor, d_angular, off_limb + + +def cosine_correction(im_map: Map, cosmap=None): + """ + Perform magnetic field cosine correction. + + Parameters + ---------- + inmap : Map + Processed SunPy magnetogram map. + cosmap : numpy.ndarray, optional + An array of the cosine correction factors for each pixel. + If not provided, computed using get_cosine_correction. + + Returns + ------- + corrected_data : numpy.ndarray + The magnetic field data after applying the cosine correction (units = Gauss). + + """ + if cosmap is None: + cosmap = get_cosine_correction(im_map)[0] + + scale = (im_map.scale[0] + im_map.scale[1]) / 2 + + angle_limit = np.arcsin(1 - (scale / im_map.rsun_obs).value) + cos_limit = 1 / np.cos(angle_limit) + cosmap = np.clip(cosmap, None, cos_limit) + + corrected_data = im_map.data * cosmap * u.Gauss + return corrected_data diff --git a/smart/tests/test_map_processing.py b/smart/tests/test_map_processing.py new file mode 100644 index 0000000..d3df159 --- /dev/null +++ b/smart/tests/test_map_processing.py @@ -0,0 +1,102 @@ +import numpy as np +import pytest + +import astropy.units as u + +from sunpy.map import Map, all_coordinates_from_map, coordinate_is_on_solar_disk +from sunpy.map.mapbase import GenericMap + +from smart.segmentation.map_processing import ( + cosine_correction, + get_cosine_correction, + map_threshold, + smooth_los_threshold, +) + + +@pytest.fixture +def hmi_nrt(): + return "https://solmon.dias.ie/data/2024/06/06/HMI/fits/hmi.m_720s_nrt.20240606_230000_TAI.3.magnetogram.fits" + + +@pytest.fixture +def mag_map_sample(): + return map_threshold(Map(hmi_nrt())) + + +@pytest.fixture +def create_fake_map(value, shape=((4098, 4098))): + fake_data = np.ones(shape) * value + return Map(fake_data, mag_map_sample().meta) + + +def test_map_threshold(im_map): + processed_map = map_threshold(im_map) + + assert isinstance(processed_map, GenericMap), "Result is not a SunPy Map." + + coordinates = all_coordinates_from_map(processed_map) + on_solar_disk = coordinate_is_on_solar_disk(coordinates) + assert np.all(np.isnan(processed_map.data[~on_solar_disk])), "Off-disk NaN values not set correctly." + + +def test_get_cosine_correction_shape(im_map): + cos_cor, d_angular, off_limb = get_cosine_correction(im_map) + assert cos_cor.shape == im_map.data.shape, "cos_cor shape != im_map.data.shape" + assert d_angular.shape == im_map.data.shape, "d_angular shape != im_map.data.shape" + assert off_limb.shape == im_map.data.shape, "off_limb shape != im_map.data.shape" + + +def test_get_cosine_correction_limits(im_map): + cos_cor, d_angular, off_limb = get_cosine_correction(im_map) + + edge = 0.99 + coordinates = all_coordinates_from_map(im_map) + x = coordinates.Tx.to(u.arcsec) + y = coordinates.Ty.to(u.arcsec) + d_radial = np.sqrt(x**2 + y**2) + + off_disk = d_radial >= (im_map.rsun_obs * edge) + on_disk = d_radial < (im_map.rsun_obs * edge) + + assert np.all(cos_cor >= 0), "cos_cor lower limits incorrect" + assert np.all(cos_cor <= 1 / np.cos(np.arcsin(edge))), "cos_cor upper limits incorrect" + + assert np.all(d_angular >= np.arcsin(-1) * u.rad), "d_angular lower limits incorrect" + assert np.all(d_angular <= np.arcsin(1) * u.rad), "d_angular upper limist incorrect" + + assert np.all(off_limb[off_disk] == 0), "not all off_disk values = 0" + assert np.all(off_limb[on_disk] == 1), "not all on_disk values = 1" + + +def test_cosine_correction(im_map): + coordinates = all_coordinates_from_map(im_map) + + los_radial = np.cos(coordinates.Tx.to(u.rad)) * np.cos(coordinates.Ty.to(u.rad)) + im_map.data[:, :] = los_radial + + fake_map = Map(los_radial, im_map.meta) + fake_cosmap = np.ones((len(los_radial), len(los_radial))) + + cosmap, corrected_data = cosine_correction(fake_map, fake_cosmap) + corrected_data_value = corrected_data.to_value(u.Gauss) + assert np.allclose(corrected_data_value, 1, atol=1e-4), "cosine corrected data not behaving as expected" + + +def test_smooth_los_threshold(): + under_thresh = create_fake_map(1) + over_thresh = create_fake_map(1000) + + smooth_under, fl_under, mask_under = smooth_los_threshold(under_thresh, thresh=500 * u.Gauss) + smooth_over, fl_over, mask_over = smooth_los_threshold(over_thresh, thresh=500 * u.Gauss) + + assert isinstance(smooth_under, type(under_thresh)), "smooth_under is no longer a Map" + assert isinstance(smooth_over, type(over_thresh)), "smooth_over is no longer a Map" + + assert np.sum(fl_under) == 0, "fl should all be False when all data is below threshold" + assert np.sum(fl_over) == len( + fl_over.flatten() + ), "fl should all be True when all data is above threshold " + + assert np.sum(mask_under) == 0, "no regions should have been detected in 'under_thresh'" + assert np.sum(mask_over) > 0, "background region should have been detected in 'over thresh'"