Source code for petrofit.modeling.models

import os
from copy import copy
from collections import OrderedDict
import warnings

import numpy as np

from scipy.ndimage import rotate
from scipy.special import gammainc, gamma, gammaincinv
from scipy.signal import convolve

from astropy.nddata import block_reduce
from astropy.modeling import FittableModel, Parameter, custom_model, models

__all__ = [
    "get_default_sersic_bounds",
    "make_grid",
    "PSFConvolvedModel2D",
    "Nuker2D",
    "sersic_enclosed",
    "sersic_enclosed_inv",
    "sersic_enclosed_model",
    "PetroApprox",
    "petrosian_profile",
    "petrosian_model",
    "get_default_gen_sersic_bounds",
    "GenSersic2D",
    "Ie_to_I0",
    "I0_to_Ie",
]


[docs] def Ie_to_I0(I_e, n): """ Converts the sersic intensity at the effective radius (I_e) to the intensity at the 0 radius (I_0) based on the sersic index (n) Parameters ---------- I_e : float Sersic intensity at the effective radius. n : float Sersic index. Returns ------- float Intensity at the 0 radius (I_0) """ return I_e * np.exp(gammaincinv(2.0 * n, 0.5))
[docs] def I0_to_Ie(I_0, n): """ Converts the intensity at the 0 radius (I_0) to the Sersic intensity at the effective radius (I_e) based on the Sersic index (n). Parameters ---------- I_0 : float Intensity at the 0 radius (I_0). n : float Sersic index. Returns ------- float Sersic intensity at the effective radius (I_e). """ return I_0 / np.exp(gammaincinv(2.0 * n, 0.5))
[docs] def get_default_sersic_bounds(override={}): """Returns the default bounds of the Sersic profile.""" bounds = { "amplitude": (0.0, None), "r_eff": (1e-3, None), "n": (0.1, 10), "ellip": (0, 0.99), "theta": (-2 * np.pi, 2 * np.pi), } bounds.update(override) return bounds
[docs] def get_default_gen_sersic_bounds(override={}): """ Returns the default bounds of a Generalized Sersic profile. This is identical to `get_default_sersic_bounds` but adds a `c_0` bound. """ bounds = get_default_sersic_bounds() bounds["c_0"] = (0, 2.0) bounds.update(override) return bounds
[docs] def make_grid(size, origin=(0, 0), factor=1): """ Function to make image sampling grid. Parameters ---------- size : int The size of the sampling grid. Sampling grid is a square with each side of size `size`. origin : tuple Bottom corner of the sampling grid (x, y). factor : int Oversampling factor """ assert isinstance(factor, int) x_arange = (np.arange(0.5, size * factor, 1) / factor) - 0.5 y_arange = (np.arange(0.5, size * factor, 1) / factor) - 0.5 x_arange += origin[0] y_arange += origin[1] return np.meshgrid(x_arange, y_arange)
[docs] class PSFConvolvedModel2D(FittableModel): """ Fittable model for converting `FittableModel` and `CompoundModel` into 2D images. This model takes the input sub-model and adds PSF convolution, as well as PSF convolution. Parameters ---------- model : `astropy.modeling.core.Model` Base model to convert into an image. psf : array 2D normalized (i.e sum(psf) = 1) image of the point spread function. oversample : None or int or tuple Oversampling factor. If set to None, no oversampling will be applied to the image. If an integer is provided, the whole image will be oversampled by that factor. If a tuple of `(center_x, center_y, box_length, oversample_factor)` can be used to define an oversampling window. `box_length` and `oversample_factor` should always be integers. `center_x` and `center_y` can be either float values of the oversampling window or string names of parameters in the input model (for example `"x_0"`). psf_oversample : None or int Oversampling factor of the PSF relative to data. The `oversample` factor should be an integer multiple of the PSF oversampling factor (i.e `oversample > psf_oversample`). name : string Name for the `PSFConvolvedModel2D` model instance. """ # Default _param_names list; this will be filled in by the implementation's # __init__ _param_names = () n_inputs = 2 # Model has 2 inputs (x, y) n_outputs = 1 # Model has 1 output z where z = Z(x, y) _oversample = None # Oversampling factor _psf_oversample = None # PSF oversampling factor relative to image _psf = None # PSF image _cache_grid = True # Cache sampling grid? _cached_grid_size = 0 # Cached sampling grid size _cached_grid_factor = 0 # Cached sampling grid oversampling rule _cached_grid = None # Cached sampling grid _cache_grid_range = None # Cached sampling grid range (x.min, x.max, y.min, y.max) def __init__( self, model, psf=None, oversample=None, psf_oversample=None, name=None, **kwargs ): # Reset params self._parameters = None self._parameters_ = {} # Check if input model is good if isinstance(model, self.__class__): raise TypeError( "Can not wrap a PSFConvolvedModel2D, try: PSFConvolvedModel2D(psf_convolved_model.model)" ) assert model.n_inputs == 2, "Input model is not 2D" # Save attributes self.psf = psf self._model = model # Load model params self._load_parameters() # add oversample regions self.oversample = oversample self.psf_oversample = psf_oversample super().__init__(name=name, **kwargs) # Sync model states if "fixed" not in kwargs: self.fixed.update(model.fixed) if "bounds" not in kwargs: self.bounds.update(model.bounds) def _load_parameters(self): """Function to load parameters from the sub-model""" if self._parameters is not None: # do nothing return self._parameters_ = {} self._param_names = [] for param_name, param_val in zip( self._model.param_names, self._model.parameters ): param = Parameter(param_name, default=param_val) self.__dict__[param_name] = param self._parameters_[param_name] = param self._param_names.append(param_name) param_name = "psf_pa" param = Parameter(param_name, default=0) self.__dict__[param_name] = param self._parameters_[param_name] = param self._param_names.append(param_name) self._param_names = tuple(self._param_names) @property def model(self): """Returns sub-model with current parameters of the `PSFConvolvedModel2D`""" model = self._model.copy() for param in model.param_names: setattr(model, param, getattr(self, param).value) fixed = copy(self.fixed) del fixed["psf_pa"] bounds = copy(self.bounds) del bounds["psf_pa"] model.fixed.update(fixed) model.bounds.update(bounds) return model @property def param_names(self): """ On most `Model` classes this is a class attribute, but for `PSFConvolvedModel2D` models it is an instance attribute since each input sub-model can have different parameters. """ return self._param_names @property def psf(self): """PSF Image""" return self._psf @psf.setter def psf(self, psf): if psf is not None and np.round(psf.sum(), 6) != 1: warnings.warn( "Input PSF not normalized to 1, current sum = {}".format(psf.sum()) ) self._psf = psf @property def oversample(self): """Sampling grid oversample Factor""" return self._oversample @oversample.setter def oversample(self, oversample): psf_oversample = self._get_psf_factor() if oversample is not None: if isinstance(oversample, (tuple, list, np.ndarray)): if len(oversample) != 4: raise ValueError("oversample should be (x, y, size, factor).") if not isinstance(oversample[2], int) or not isinstance( oversample[3], int ): raise ValueError("size and factor should be integers.") grid_factor = oversample[3] oversample = tuple(oversample) # Important Conversion! elif isinstance(oversample, int): grid_factor = oversample else: raise TypeError( "oversample should be a single int factor or a tuple (x, y, size, factor)." ) if grid_factor <= 0: raise ValueError("oversample should be a positive int factor.") if grid_factor % psf_oversample != 0: raise ValueError( "oversample should be equal to or an integer multiple of psf_oversample" ) self._oversample = oversample @property def psf_oversample(self): """PSF oversample factor relative to data""" return self._psf_oversample @psf_oversample.setter def psf_oversample(self, psf_oversample): grid_factor = self._get_oversample_factor() if psf_oversample is not None: if self.psf is None: raise ValueError("psf_oversample provided but PSF is None") if not isinstance(psf_oversample, int) or psf_oversample <= 0: raise TypeError( "psf_oversample should be a single positive int factor." ) if grid_factor % psf_oversample != 0: raise ValueError( "oversample should be equal to or an integer multiple of psf_oversample. " "Set PSFConvolvedModel2D.oversample value first." ) self._psf_oversample = psf_oversample def _get_oversample_factor(self): if isinstance(self.oversample, (tuple, list, np.ndarray)): return self.oversample[3] else: return 1 if self.oversample is None else self.oversample def _get_psf_factor(self): return 1 if self.psf_oversample is None else self.psf_oversample
[docs] def clear_cached_grid(self): """Clears cached grid and resets class attributes to default values""" self._cached_grid_size = 0 self._cached_grid_factor = 0 if self._cached_grid is not None: del self._cached_grid self._cached_grid = None self._cache_grid_range = None
@property def cache_grid(self): """Returns the cached sampling grid""" return self._cache_grid @cache_grid.setter def cache_grid(self, value): """Sets the cached sampling grid""" if value is False: self._cache_grid = False self.clear_cached_grid() elif value is True: self._cache_grid = True else: raise ValueError("{} is not a bool, use True or False".format(value))
[docs] def evaluate(self, x, y, *params, **kwargs): """ Evaluate the model on given coordinates and parameters. Parameters: ----------- x : numpy.ndarray Array of x-coordinates where the model is to be evaluated. y : numpy.ndarray Array of y-coordinates where the model is to be evaluated. *params : tuple Additional parameters for the wrapped model, though the last parameter is expected to be `psf_p`. **kwargs : dict Additional keyword arguments for the wrapped mode. Returns: -------- numpy.ndarray The evaluated model image at the given coordinates. Notes: ------ - The function prepares the main sampling grid based on the provided coordinates and oversampling factors. - It constructs the main model image by sampling the sub-model. - If oversampling is specified, the function handles both integer and sub-grid based oversampling. - The model image is convolved with the PSF if provided. - The final model image is reduced to the data resolution if the PSF is oversampled. - The function returns the model image at the specified coordinates. """ # Extract sub-model params as well as `psf_p` *sub_model_params, psf_p = params # Prepare image indices i = (x - x.min()).astype(int) j = (y - y.min()).astype(int) # Is oversampling sub-grid based is_subgrid_oversample = isinstance(self.oversample, tuple) # Compute image size and oversampling factor # ------------------------------------------ psf_factor = ( self._get_psf_factor() ) # Oversampling factor of PSF compared to data # Grid oversample factor compared to data: # If the oversampling is subgrid, then the main grid is at PSF oversample. # If the oversampling is an int, the main grid is at oversample factor. grid_factor = ( psf_factor if is_subgrid_oversample else self._get_oversample_factor() ) grid_to_psf_factor = ( grid_factor // psf_factor ) # Oversampling factor of grid compared to PSF # Grid size params: grid_size = max([i.max(), j.max()]) + 1 # size = max_index + 1 grid_range = (x.min(), x.max(), y.min(), y.max()) # Main grid bounds # Make main grid # -------------- # Make the main sampling grid if ( grid_size == self._cached_grid_size and grid_factor == self._cached_grid_factor and grid_range == self._cache_grid_range ): # If the sampling gird cached main_grid = self._cached_grid else: # Else make a sampling gird main_grid = make_grid( grid_size, origin=(x.min(), y.min()), factor=grid_factor ) # Cache Grid if self.cache_grid: self._cached_grid = main_grid self._cached_grid_size = grid_size self._cached_grid_factor = grid_factor self._cache_grid_range = grid_range # Split main grid to x and y components x_grid, y_grid = main_grid # Main Model Image # ---------------- # Construct main model image by sampling sub-model model_image = self._model.evaluate(x_grid, y_grid, *sub_model_params) # Oversampling # ------------ if not is_subgrid_oversample and grid_to_psf_factor > 1: # If the oversample factor is an int, block reduce the image to PSF resolution model_image = ( block_reduce(model_image, grid_to_psf_factor) / grid_to_psf_factor**2 ) elif is_subgrid_oversample: # If the oversample is a window, compute pixel values for that window # and block reduce the image to PSF resolution. # Load the window params sub_grid_x0, sub_grid_y0, sub_grid_size, sub_grid_factor = self.oversample sub_grid_to_psf_factor = sub_grid_factor // psf_factor if sub_grid_to_psf_factor > 1: # If the center of the window is a parameter name, extract its value if isinstance(sub_grid_x0, str): assert ( sub_grid_x0 in self._model.param_names ), "oversample param '{}' is not in the wrapped model param list".format( sub_grid_x0 ) idx = self._model.param_names.index(sub_grid_x0) sub_grid_x0 = sub_model_params[idx][0] if isinstance(sub_model_params[idx], (list, np.ndarray)) else sub_model_params[idx] if isinstance(sub_grid_y0, str): assert ( sub_grid_y0 in self._model.param_names ), "oversample param '{}' is not in the wrapped model param list".format( sub_grid_y0 ) idx = self._model.param_names.index(sub_grid_y0) sub_grid_y0 = sub_model_params[idx][0] if isinstance(sub_model_params[idx], (list, np.ndarray)) else sub_model_params[idx] # Compute the corner of the sub-grid sub_grid_origin = ( np.round(sub_grid_x0) - sub_grid_size // 2, np.round(sub_grid_y0) - sub_grid_size // 2, ) # Make an oversampled sub-grid for window x_sub_grid, y_sub_grid = make_grid( sub_grid_size, origin=sub_grid_origin, factor=sub_grid_factor ) # Sample the sub-model onto the sub-grid sub_model_oversampled_image = self._model.evaluate( x_sub_grid, y_sub_grid, *sub_model_params ) # Block reduce the window to the psf resolution sub_model_image = ( block_reduce(sub_model_oversampled_image, sub_grid_to_psf_factor) / sub_grid_to_psf_factor**2 ) # Compute window indices in main image frame at data resolution first i_sub_min = int(np.round(sub_grid_origin[0])) j_sub_min = int(np.round(sub_grid_origin[1])) i_sub_max = i_sub_min + sub_grid_size j_sub_max = j_sub_min + sub_grid_size # Clip window indices if i_sub_min < 0: i_sub_min = 0 if j_sub_min < 0: j_sub_min = 0 if i_sub_max > i.max(): i_sub_max = i.max() + 1 if j_sub_max > j.max(): j_sub_max = j.max() + 1 # Convert to PSF resolution i_sub_min *= psf_factor j_sub_min *= psf_factor i_sub_max *= psf_factor j_sub_max *= psf_factor # Add oversampled window to image model_image[j_sub_min:j_sub_max, i_sub_min:i_sub_max] = sub_model_image # PSF convolve # ------------ if self.psf is not None: psf = self.psf psf_p = psf_p[0] if isinstance(psf_p, (list, np.ndarray)) else psf_p if psf_p != 0: psf = rotate(psf, psf_p, reshape=False) model_image = convolve(model_image, psf, mode="same") if psf_factor > 1: # If PSF is oversampled relative to the data, block_reduce to data resolution model_image = block_reduce(model_image, psf_factor) / psf_factor**2 return model_image[j, i]
[docs] class GenSersic2D(models.Sersic2D): """ Two dimensional Sersic surface brightness profile with Generalized Ellipses described in Peng et al. 2010. Parameters ---------- amplitude : float Surface brightness at r_eff. r_eff : float Effective (half-light) radius n : float Sersic Index. x_0 : float, optional x position of the center. y_0 : float, optional y position of the center. ellip : float, optional Ellipticity. theta : float or `~astropy.units.Quantity`, optional The rotation angle as an angular quantity (`~astropy.units.Quantity` or `~astropy.coordinates.Angle`) or a value in radians (as a float). The rotation angle increases counterclockwise from the positive x axis. c_0 : float Boxiness of elliptical isophote. """ c_0 = Parameter(default=0, description="General boxiness of isophote")
[docs] @classmethod def evaluate(cls, x, y, amplitude, r_eff, n, x_0, y_0, ellip, theta, c_0): """Two dimensional Sersic profile function.""" if cls._gammaincinv is None: from scipy.special import gammaincinv cls._gammaincinv = gammaincinv bn = cls._gammaincinv(2.0 * n, 0.5) a, b = r_eff, (1 - ellip) * r_eff cos_theta, sin_theta = np.cos(theta), np.sin(theta) x_maj = (x - x_0) * cos_theta + (y - y_0) * sin_theta x_min = -(x - x_0) * sin_theta + (y - y_0) * cos_theta z = (abs(x_maj / a) ** (c_0 + 2) + abs(x_min / b) ** (c_0 + 2)) ** ( 1 / (c_0 + 2) ) return amplitude * np.exp(-bn * (z ** (1 / n) - 1))
@custom_model def Nuker2D(x, y, amplitude=1, r_break=1, x_0=0, y_0=0, alpha=2, betta=4, gamma=0): """Two dimensional Nuker2D model""" x_maj = (x - x_0) + (y - y_0) x_min = -(x - x_0) + (y - y_0) r = np.sqrt((x_maj) ** 2 + (x_min) ** 2) r[np.where(r == 0)] = np.nan return ( amplitude * (2 ** ((betta - gamma) / alpha)) * (r / r_break) ** (-gamma) * (1 + (r / r_break) ** alpha) ** ((gamma - betta) / alpha) ) @custom_model def CoreSersic2D( x, y, amplitude=1, r_eff=1, r_break=1, n=1, x_0=0, y_0=0, alpha=10, gamma=0.1, ellip=0, theta=0, ): """ Core Sersic model as defined by Graham et al 2003. """ bn = gammaincinv(2.0 * n, 0.5) a, b = r_eff, (1 - ellip) * r_eff cos_theta, sin_theta = np.cos(theta), np.sin(theta) x_maj = (x - x_0) * cos_theta + (y - y_0) * sin_theta x_min = -(x - x_0) * sin_theta + (y - y_0) * cos_theta r = np.sqrt((x_maj / a) ** 2 + (x_min / b) ** 2) r[np.where(r == 0)] = np.nan I = ( amplitude * (2 ** -(gamma / alpha)) * np.exp(bn * (2 ** (1 / alpha) * (r_break / r_eff)) ** (1 / n)) ) return ( I * (1 + (r_break / r) ** (alpha)) ** (gamma / alpha) * np.exp( -bn * ((r**alpha + r_break**alpha) / (r_eff) ** alpha) ** (1 / (alpha * n)) ) )
[docs] def sersic_enclosed(r, amplitude, r_eff, n, ellip=0): """Total Sersic flux enclosed within a radius.""" bn = gammaincinv(2.0 * n, 0.5) x = bn * (r / r_eff) ** (1 / n) g = gamma(2.0 * n) * gammainc(2.0 * n, x) return ( amplitude * (r_eff**2) * 2 * np.pi * n * ((np.exp(bn)) / (bn) ** (2 * n)) * g * (1 - ellip) )
[docs] def sersic_enclosed_inv(f, amplitude, r_eff, n, ellip=0): """Radius that would enclose the input flux.""" bn = gammaincinv(2.0 * n, 0.5) g = f / ( amplitude * (r_eff) ** 2 * 2 * np.pi * n * ((np.exp(bn)) / (bn) ** (2 * n)) * (1 - ellip) ) x = gammaincinv(2.0 * n, g / gamma(2.0 * n)) return (x / bn) ** n * r_eff
@custom_model def sersic_enclosed_model(x, amplitude=1000, r_eff=30, n=2, ellip=0): """Model for total Sersic flux enclosed within a radius.""" return sersic_enclosed(x, amplitude, r_eff, n, ellip=ellip)
[docs] def petrosian_profile(r, r_eff, n): """Ideal Sersic Petrosian profile evaluated at input radii.""" bn = gammaincinv(2.0 * n, 0.5) x = bn * (r / r_eff) ** (1 / n) g = gamma(2 * n) * gammainc(2 * n, x) return (np.exp(-x) * x ** (2 * n)) / (2 * n * g)
@custom_model def petrosian_model(x, r_eff=1, n=4): """Ideal Sersic Petrosian model.""" return petrosian_profile(x, r_eff, n)
[docs] class PetroApprox: """ This class contains approximations of various Pertorisian and Sersic parameters. These approximations do not take into account PSF smearing but can be used to approximate initial guesses. """
[docs] @staticmethod def u2080_to_c2080(u2080): """Uncorrected C2080 to epsilon corrected C2080""" return models.Polynomial1D( 6, c0=2.26194802, c1=-3.61130833, c2=3.8219758, c3=-1.6414660, c4=0.38059409, c5=-0.0450384, c6=0.00221922, )(u2080)
[docs] @staticmethod def c2080_to_n(c2080): """Corrected C2080 to Sersic index n""" return models.Polynomial1D( 5, c0=-0.41844073, c1=0.20487513, c2=0.08626531, c3=0.0106707, c4=-0.00082523, c5=0.00002486, )(c2080)
[docs] @staticmethod def n_to_epsilon(n): """Sersic index n to epsilon""" approx_model = models.Polynomial1D( 5, c0=-6.54870813, c1=-2.15040843, c2=-0.28993623, c3=-0.04099376, c4=-0.00046837, c5=-0.00022305, ) + models.Exponential1D(amplitude=7.48787292, tau=2.6876055) return approx_model(n)
[docs] @staticmethod def p0502_to_epsilon(p0502): """Petrosian concentration index P0502 (uncorrected quantity) to epsilon""" approx_model = models.Polynomial1D( 6, c0=1.09339566, c1=-0.14524911, c2=0.50361697, c3=-0.1215809, c4=0.02533795, c5=-0.00196243, c6=0.00009081, ) + models.Exponential1D(amplitude=0.03312881, tau=1.83616642) return approx_model(p0502)