Source code for ForMoSA.nested_sampling.ns_analysis

import logging
import numpy as np
from tqdm import tqdm

import ForMoSA.utils.spec as us
from ForMoSA.core.errors import ForMoSAError
from ForMoSA.grid.model_grid import ModelGrid
from ForMoSA.core.loggings import setup_logging
from ForMoSA.transform.observed import ObservedModel
from ForMoSA.utils.misc import get_weighted_percentile
from ForMoSA.core.enums import ParameterKind, ObservationKeys
from ForMoSA.nested_sampling.nested_sampling import NestedSampling
from ForMoSA.transform.apply_effects import ApplyObservationEffects

[docs] class NSAnalysis(object): ''' Class used to subsequent analysis of nested sampling products. It includes the reconstructions of models, best fit, computations of ccf, ... Parameters ---------- ns : NestedSampling Instance of class NestedSampling logger : logging.Logger Logger log_level : str Level of the Logger Notes ----- Authors: Allan Denis ''' def __init__(self, ns: NestedSampling, logger: logging.Logger | None = None, log_level: str = 'INFO') -> None: self._logger = logger or setup_logging() self._ns = ns self._validate() # =================== # Properties # =================== @property def logger(self) -> logging.Logger: """Logger.""" return self._logger @property def ns(self) -> NestedSampling: """Instance of NestedSampling.""" return self._ns @property def best_fit(self) -> list[ObservedModel]: """Best fit.""" if self.ns.results is None: raise ForMoSAError('Please first run the Nested Sampling before computing the best fit', self.logger) best_params = list(self.ns.results.median_parameters.values()) return self.build_models_from_theta(best_params) @property def native_best_fit(self) -> ObservedModel: """Best fit parameters applied to the native model.""" if self.ns.results is None: raise ForMoSAError('Please first run the Nested Sampling before computing the best fit', self.logger) # Best parameters best_params = list(self.ns.results.median_parameters.values()) # wavelength range wrange = (self.ns.restricted_observations.wavelength_range[0] * 0.95, self.ns.restricted_observations.wavelength_range[1] * 1.05) # build restricted grid grid = self.ns.subgrids.parent_grid._restricted_grid(f'{wrange[0]},{wrange[1]}', print_logger=True) # Interpolate nan values grid._interpolate_missing_values() return self._build_native_observed_model(grid, best_params, print_logger=True) # =================== # Methods # =================== def _validate(self) -> None: ''' Validation for NSAnalysis. Notes ----- Authors: Allan Denis ''' if not isinstance(self.ns, NestedSampling): raise ForMoSAError(f'Wrong type for ns: {type(self.ns)}. Expeceted an instance of NestedSampling')
[docs] def build_models_from_theta(self, theta): ''' Build the models from the values of the free parameters drawn by the Nested Sampling Parameters ---------- theta : np.ndarray[float] List of values picked by the Nested Sampling for the free parameters Returns ------- list[ObservedModel] List of instances of class ObservedModel Notes ----- Authors: Allan Denis ''' return self.ns.build_models_from_theta(theta)
def _build_native_observed_model(self, grid: ModelGrid, params: list[float], print_logger: bool = False) -> ObservedModel: ''' Build native observed model including analytic scaling. Parameters ---------- grid : ModelGrid Instance of class ModelGrid params : list[float] list or array model parameters print_logger : bool Whether to print the logger Returns ------- ObservedModel Native observed model Notes ----- Authors: Allan Denis ''' if not isinstance(grid, ModelGrid): raise ForMoSAError(f'Wrong type for grid: {type(grid)}. Expected an instance of ModelGrid', self.logger) # Build observed parameters observed_params = self.ns._build_params_for_obs(params, 0).global_params # Build native model from parameters native_model = ObservedModel.from_grid_and_params(grid, observed_params) # If radius is fitted, no analytic scaling needed if observed_params.has_kind(ParameterKind.R): return native_model # Radius is not fitted, so we need to analytically scale the model # In order to do that, we need to concatenate all adapted models and all adapted observations to make them comparable wave_all = np.array([]) flux_all = np.array([]) res_all = np.array([]) ind_obs_list = [] for i in range(self.ns.observations.n_observations): # Retrieve adapted subgrid and obs subgrid = self.ns.restricted_subgrids.subgrids[i] obs = self.ns.restricted_observations.observations[i] # Build instance of ObservedModel from parameters observed = ObservedModel.from_grid_and_params(subgrid, observed_params) if len(observed.wave) != len(obs.wave): observed = ApplyObservationEffects._apply_resolution_decreasing(observed, obs) if not obs.hc_mode: # Concatenate everything wave_all = np.append(wave_all, observed.wave) flux_all = np.append(flux_all, observed.flux) res_all = np.append(res_all, observed.res) ind_obs_list.append(i) # Build stacked observed_model and observations stacked_model = ObservedModel(wave_all, flux_all, res_all) stacked_model._sort() stacked_obs = self.ns.restricted_observations._stack(ind_obs_list, print_logger=print_logger) if np.any(np.isnan(stacked_model.flux)): return ObservedModel(native_model.wave, [np.nan] * len(native_model.wave), native_model.flux) # analytic scaling stacked_model.flux, ck = us.calc_ck( stacked_model.flux, stacked_obs[ObservationKeys.FLUX.canonical], stacked_obs[ObservationKeys.ERROR.canonical], 0, 0, analytic='yes', ) native_model.flux *= ck return native_model
[docs] def best_fit_interval(self, perc: float = 0.68) -> tuple[ObservedModel, ObservedModel]: ''' Confidence interval of the native best fit. Parameters ---------- perc : float Percentile value between 0 and 1 (0.68 for 1 sigma, 0.95 for 2 sigmas) Returns ------- tuple[ObservedModel, ObservedModel] lower and higher values of the flux for the confidence interval Notes ----- Authors: Allan Denis ''' perc = float(perc) # Initial checks if self.ns.results is None: raise ForMoSAError('Please first run the Nested Sampling before computing the best fit', self.logger) if perc < 0 or perc > 1: raise ForMoSAError(f'perc must be a float between 0 and 1. Got {perc} with type {type(perc)}', self.logger) lower = (1 - perc) / 2 upper = (1 + perc) / 2 models_flux = [] # wavelength range wrange = (self.ns.observations.wavelength_range[0] * 0.95, self.ns.observations.wavelength_range[1] * 1.05) # build restricted grid grid = self.ns.subgrids.parent_grid._restricted_grid(f'{wrange[0]},{wrange[1]}', print_logger=True) # Interpolate nan values grid._interpolate_missing_values() self.logger.info(f' Computing confidence interval with percentiles [{np.round(lower,2)} - {np.round(upper,2)}]') for sample in tqdm(self.ns.results.samples[self.ns.results.burn_in:]): observed_model = self._build_native_observed_model(grid, sample, print_logger=False) models_flux.append(observed_model.flux) models_flux = np.array(models_flux) perc_1sigma_lower = get_weighted_percentile(lower * 100, models_flux, weights=self.ns.results.weights[self.ns.results.burn_in:]) perc_1sigma_higher = get_weighted_percentile(upper * 100, models_flux, weights=self.ns.results.weights[self.ns.results.burn_in:]) return ObservedModel(observed_model.wave, perc_1sigma_lower, observed_model.res), ObservedModel(observed_model.wave, perc_1sigma_higher, observed_model.res)
[docs] def compute_ccf(self, rv_grid: np.ndarray, index: int = 0, theta: list | None = None) -> dict[str, np.ndarray]: ''' Compute and optionally plot the Cross-Correlation Function (CCF). Parameters ---------- rv_grid : np.ndarray Grid of radial velocity values (in km/s) index : int Index of observation used for the ccf computation theta : list List of free values of the parameters. If not provided, the best fitted parameters are used Returns ------- dict[str, np.ndarray] Dictionary of CCF results keyed by observation name Notes ----- Authors: Bhavesh Rajpoot and Allan Denis ''' if theta is not None: if len(theta) != len(self.ns.parameters.free_names): raise ForMoSAError(f'If you provide a list of free values theta, make sure it has the same length of the number of free parameters {len(self.ns.parameters.free_names)}', self.logger) results = {} obs = self.ns.restricted_observations.observations[index] subgrid = self.ns.restricted_subgrids.subgrids[index] if not obs.is_spectroscopic: raise ForMoSAError('observation {obs.name} is not spectroscopic. You cannot compute the CCF') if theta is None: theta = list(self.ns.results.median_parameters.values()) observed_params = self.ns._build_params_for_obs(theta, index) if not observed_params.has_kind(ParameterKind.RV): raise ForMoSAError('You must provide a RV parameter to compute rv/vsini map', self.logger) # Make sure RV parameter is set to 0 for p in observed_params.values: if p.kind == ParameterKind.RV: observed_params.values[p] = 0 native_model = ObservedModel.from_grid_and_params(subgrid, observed_params) wav_fit = self.ns.wave_fit[index] star_flx = obs.star_flux if obs.star_flux is not None else np.array([]) transm = obs.transm if obs.transm is not None else np.array([]) system = obs.system if obs.system is not None else np.array([]) file_tag = f'{obs.name}_{index}' self._logger.info(f' Computing RV CCF for observation {obs.name}') ccf, acf, ccf_star, rv_peak, logL, ccf_raw = us.compute_ccf( native_model.wave, native_model.flux, obs.wave, obs.flux, obs.err, native_model.res, obs.res, subgrid.res_cont, wav_fit, star_flx_obs_spectro=star_flx, transm_obs_spectro=transm, system_obs_spectro=system, rv_grid=rv_grid, rv_sini_map=False, normalize=True ) # Find best RV best_idx = np.unravel_index(np.argmax(ccf), ccf.shape) best_rv = rv_grid[best_idx[0]] self._logger.info(f' Best RV = {best_rv:.1f} km/s') results[file_tag] = { 'rv_grid': rv_grid, 'ccf': ccf, 'acf': acf, 'ccf_star': ccf_star, 'rv_peak': rv_peak, 'logL': logL } return results
[docs] def compute_rv_vsini_map(self, rv_grid: np.ndarray, vsini_grid: np.ndarray, index: int = 0, theta: list | None = None) -> dict[str, np.ndarray]: ''' Compute and optionally plot the RV vs v.sin(i) loglikelihood map. Parameters ---------- rv_grid : np.ndarray Grid of radial velocity values (in km/s) vsini_grid : np.ndarray Grid of v.sin(i) values (in km/s) index : int Index of observation used for the ccf computation theta : list List of free values of the parameters. If not provided, the best fitted parameters are used Returns ------- dict[str, np.ndarray] Dictionary of RV-vsini map results keyed by observation name Notes ----- Authors: Bhavesh Rajpoot and Allan Denis ''' if theta is not None: if len(theta) != len(self.ns.parameters.free_names): raise ForMoSAError(f'If you provide a list of free values theta, make sure it has the same length of the number of free parameters {len(self.ns.parameters.free_names)}', self.logger) results = {} obs = self.ns.restricted_observations.observations[index] subgrid = self.ns.restricted_subgrids.subgrids[index] if not obs.is_spectroscopic: raise ForMoSAError('observation {obs.name} is not spectroscopic. You cannot compute the CCF') if theta is None: theta = list(self.ns.results.median_parameters.values()) observed_params = self.ns._build_params_for_obs(theta, index) # Make sure RV parameter is set to 0 for p in observed_params.values: if p.kind == ParameterKind.RV: observed_params.values[p] = 0 # Make sure VSINI parameter is set to 0 for p in observed_params.values: if p.kind == ParameterKind.VSINI: observed_params.values[p] = 0 vsini_type = p.vsini_function.value if not observed_params.has_kind(ParameterKind.RV): raise ForMoSAError('You must provide a RV parameter to compute rv/vsini map', self.logger) if not observed_params.has_kind(ParameterKind.VSINI): raise ForMoSAError('You must provide a VSINI parameter to compute rv/vsini map', self.logger) native_model = ObservedModel.from_grid_and_params(subgrid, observed_params) ld_value = observed_params.get_kind(ParameterKind.LD) wav_fit = self.ns.wave_fit[index] star_flx = obs.star_flux if obs.star_flux is not None else np.array([]) transm = obs.transm if obs.transm is not None else np.array([]) system = obs.system if obs.system is not None else np.array([]) file_tag = f'{obs.name}_{index}' self._logger.info(f' Computing RV-vsini map for observation {file_tag}') logL_map = np.zeros((len(vsini_grid), len(rv_grid))) for j, vsini_val in enumerate(tqdm(vsini_grid, desc=f'RV-vsini map ({file_tag})', leave=False)): flx_broadened, res_broadened = us.vsini_fct( native_model.wave, native_model.flux, native_model.res, ld_value, vsini_val, vsini_type ) logL = us.compute_ccf( native_model.wave, flx_broadened, obs.wave, obs.flux, obs.err, res_broadened, obs.res, subgrid.res_cont, wav_fit, star_flx_obs_spectro=star_flx, transm_obs_spectro=transm, system_obs_spectro=system, rv_grid=rv_grid, rv_sini_map=True, normalize=False ) logL_map[j] = logL # Find best RV and vsini best_idx = np.unravel_index(np.argmax(logL_map), logL_map.shape) best_vsini = vsini_grid[best_idx[0]] best_rv = rv_grid[best_idx[1]] self._logger.info(f' Best RV = {best_rv:.1f} km/s, best vsini = {best_vsini:.1f} km/s') results[file_tag] = { 'rv_grid': rv_grid, 'vsini_grid': vsini_grid, 'logL_map': logL_map, 'best_rv': best_rv, 'best_vsini': best_vsini } return results