Source code for ForMoSA.plotting.plotting_class
from __future__ import print_function, division
import os, glob
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter
from scipy.interpolate import interp1d
from scipy.optimize import curve_fit
import corner
import xarray as xr
import pickle
import astropy.constants as cst
from tqdm import tqdm
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
# Import ForMoSA
from ForMoSA.global_file import GlobFile
from ForMoSA.utils_spec import resolution_decreasing, continuum_estimate
from ForMoSA.nested_sampling.nested_modif_spec import modif_spec
from ForMoSA.nested_sampling.nested_modif_spec import doppler_fct
from ForMoSA.nested_sampling.nested_modif_spec import vsini_fct
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
[docs]
class ComplexRadar():
'''
Class to create Radar plots with asymmetric error bars.
Author: Paulina Palma-Bifani, Matthieu Ravet, Allan Denis
Adapted from Damian Cummins: https://github.com/DamianCummins/statsbomb-football-event-visualisations/blob/master/Statsbomb%20Womens%20World%20Cup%202019%20visualisation.ipynb
'''
def __init__(self, fig, variables, ranges, n_ordinate_levels=6):
'''
Initialize class.
Args:
fig (object): matplotlib figure object
variables (list): list of parameters to plot
ranges (list(tuple)): upper and lower limits for each parameters
n_ordinate_levels (int): (default = 6) number of gridlines in the plot
Returns:
None
'''
angles = np.arange(0, 360, 360./len(variables))
axes = [fig.add_axes([0.1,0.1,0.9,0.9], polar=True, label = "axes{}".format(i)) for i in range(len(variables))]
l, text = axes[0].set_thetagrids(angles, labels=variables)
[[txt.set_fontweight('bold'),
txt.set_fontsize(12),
txt.set_position((0,-0.2))] for txt in text]
for ax in axes[1:]:
ax.patch.set_visible(False)
ax.grid("off")
ax.xaxis.set_visible(False)
for i, ax in enumerate(axes):
grid = np.linspace(*ranges[i], num=n_ordinate_levels)
gridlabel = ["{}".format(round(x,2)) for x in grid]
gridlabel[0] = "" # clean up origin
ax.set_rgrids(grid, labels=gridlabel,angle=angles[i])
ax.set_ylim(*ranges[i])
# variables for plotting
self.angle = np.deg2rad(np.r_[angles, angles[0]])
self.ranges = ranges
self.ax = axes[0]
[docs]
def plot(self, data, *args, **kw):
'''
Function to display the plot.
Args:
data (list): best value for each parameter
*args : Variable length argument list.
**kw : Arbitrary keyword arguments.
Returns:
None
'''
sdata = self.scale_data(data, self.ranges)
self.ax.plot(self.angle, np.r_[sdata, sdata[0]], *args, **kw)
[docs]
def fill(self, data, *args, **kw):
'''
Add symmetric error bars to the plot.
Args:
data (list): best value for each parameter
*args : Variable length argument list.
**kw : Arbitrary keyword arguments.
Returns:
None
'''
sdata = self.scale_data(data, self.ranges)
self.ax.fill(self.angle, np.r_[sdata, sdata[0]], *args, **kw)
[docs]
def fill_between(self, list_down, list_up, *args, **kw):
'''
Add asymmetric error bars to the plot.
Args:
list_down (list): list of lower error bars
list_up (list): list of upper error bars
*args : Variable length argument list.
**kw : Arbitrary keyword arguments.
Returns:
None
'''
sdata_down = self.scale_data(list_down, self.ranges)
sdata_up = self.scale_data(list_up, self.ranges)
self.ax.fill_between(self.angle,np.r_[sdata_down,sdata_down[0]], np.r_[sdata_up,sdata_up[0]], *args, **kw)
[docs]
def scale_data(self, data, ranges):
'''
Function to check that lower and upper limits are correctly ordered. It scales data[1:] to ranges[0]
Args:
data (list): best value for each parameter
ranges (list(tuple)): upper and lower limits for each parameters
*args : Variable length argument list.
**kw : Arbitrary keyword arguments.
Returns:
None
'''
for d, (y1, y2) in zip(data[1:], ranges[1:]):
if not np.isnan(d):
assert (y1 <= d <= y2) or (y2 <= d <= y1)
x1, x2 = ranges[0]
d = data[0]
sdata = [d]
for d, (y1, y2) in zip(data[1:], ranges[1:]):
if y1 > y2:
d = _invert(d, (y1, y2))
y1, y2 = y2, y1
sdata.append((d-y1) / (y2-y1) * (x2 - x1) + x1)
return sdata
# ----------------------------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------------------------------
[docs]
class PlottingForMoSA():
'''
Class containing all the plotting functionalities of ForMoSA.
Author: Paulina Palma-Bifani, Simon Petrus, Matthieu Ravet and Allan Denis
'''
def __init__(self, config_file_path=None, color_out='blue', global_params=None):
'''
Initialize class by inheriting the global parameter class of ForMoSA.
Args:
config_file_path (str): path to the config.ini file currently used
color_out (str): color to use for the model
global_params (GlobFile, optional): already-initialized GlobFile object
Returns:
None
'''
if global_params is not None:
self.global_params = global_params
elif config_file_path is not None:
self.global_params = GlobFile(config_file_path)
else:
raise ValueError("Either 'config_file_path' or 'global_params' must be provided.")
self.color_out = color_out
def _get_posteriors(self, burn_in=0):
'''
Function to get the posteriors, including luminosity derivation and Bayesian evidence logz.
Args:
burn_in (int): Burn-in parameter to cut the convergence chain
Returns:
None
'''
with open(self.global_params.result_path + '/result_' + self.global_params.ns_algo + '.pic', 'rb') as open_pic:
result = pickle.load(open_pic)
self.samples = result['samples'][burn_in:]
self.weights = result['weights'][burn_in:]
# To test the quality of the fit
self.logl=result['logl'][burn_in:]
ind = np.where(self.logl==max(self.logl))
self.theta_best = self.samples[ind][0]
self.sample_logz = round(result['logz'][0],1)
self.sample_logzerr = round(result['logz'][1],1)
self.outputs_string = 'logz = '+ str(self.sample_logz)+' ± '+str(self.sample_logzerr)
ds = xr.open_dataset(self.global_params.model_path, decode_cf=False, engine='netcdf4')
attrs = ds.attrs
extra_parameters = [['r', 'R', r'(R$\mathrm{_{Jup}}$)'],
['d', 'd', '(pc)'],
[r'$\alpha$', r'$\alpha$', ''],
['rv', 'RV', r'(km.s$\mathrm{^{-1}}$)'],
['av', 'Av', '(mag)'],
['vsini', 'v.sin(i)', r'(km.s$\mathrm{^{-1}}$)'],
['ld', 'limb darkening', ''],
['bb_t', 'bb_t', '(K)'],
['bb_r', 'bb_r', r'(R$\mathrm{_{Jup}}$)']
]
tot_list_param_title = []
theta_index = []
if self.global_params.par1[0] != 'NA' and self.global_params.par1[0] != 'constant':
tot_list_param_title.append(attrs['title'][0] + ' ' + attrs['unit'][0])
theta_index.append('par1')
if self.global_params.par2[0] != 'NA' and self.global_params.par2[0] != 'constant':
tot_list_param_title.append(attrs['title'][1] + ' ' + attrs['unit'][1])
theta_index.append('par2')
if self.global_params.par3[0] != 'NA' and self.global_params.par3[0] != 'constant':
tot_list_param_title.append(attrs['title'][2] + ' ' + attrs['unit'][2])
theta_index.append('par3')
if self.global_params.par4[0] != 'NA' and self.global_params.par4[0] != 'constant':
tot_list_param_title.append(attrs['title'][3] + ' ' + attrs['unit'][3])
theta_index.append('par4')
if self.global_params.par5[0] != 'NA' and self.global_params.par5[0] != 'constant':
tot_list_param_title.append(attrs['title'][4] + ' ' + attrs['unit'][4])
theta_index.append('par5')
# Extra-grid parameters
if self.global_params.r[0] != 'NA' and self.global_params.r[0] != 'constant':
tot_list_param_title.append(extra_parameters[0][1] + ' ' + extra_parameters[0][2])
theta_index.append('r')
if self.global_params.d[0] != 'NA' and self.global_params.d[0] != 'constant':
tot_list_param_title.append(extra_parameters[1][1] + ' ' + extra_parameters[1][2])
theta_index.append('d')
# - - - - - - - - - - - - - - - - - - - - -
# Individual parameters / observation
main_obs_path = self.global_params.main_observation_path
if len(self.global_params.alpha) > 3: # If you want separate alpha for each observations
for indobs, obs in enumerate(sorted(glob.glob(main_obs_path))):
if self.global_params.alpha[indobs*3] != 'NA' and self.global_params.alpha[indobs*3] != 'constant': # Check if the idobs is different from constant
tot_list_param_title.append(extra_parameters[2][1] + fr'$_{indobs}$' + ' ' + extra_parameters[2][2])
theta_index.append(f'alpha_{indobs}')
else: # If you want 1 common alpha for all observations
if self.global_params.alpha[0] != 'NA' and self.global_params.alpha[0] != 'constant':
tot_list_param_title.append(extra_parameters[2][1] + ' ' + extra_parameters[2][2])
theta_index.append('alpha')
if len(self.global_params.rv) > 3: # If you want separate rv for each observations
for indobs, obs in enumerate(sorted(glob.glob(main_obs_path))):
if self.global_params.rv[indobs*3] != 'NA' and self.global_params.rv[indobs*3] != 'constant': # Check if the idobs is different from constant
tot_list_param_title.append(extra_parameters[3][1] + fr'$_{indobs}$' + ' ' + extra_parameters[3][2])
theta_index.append(f'rv_{indobs}')
else: # If you want 1 common rv for all observations
if self.global_params.rv[0] != 'NA' and self.global_params.rv[0] != 'constant':
tot_list_param_title.append(extra_parameters[3][1] + ' ' + extra_parameters[3][2])
theta_index.append('rv')
if len(self.global_params.vsini) > 4: # If you want separate vsini for each observations
for indobs, obs in enumerate(sorted(glob.glob(main_obs_path))):
if self.global_params.vsini[indobs*4] != 'NA' and self.global_params.vsini[indobs*4] != 'constant': # Check if the idobs is different from constant
tot_list_param_title.append(extra_parameters[5][1] + fr'$_{indobs}$' + ' ' + extra_parameters[5][2])
theta_index.append(f'vsini_{indobs}')
else: # If you want 1 common vsini for all observations
if self.global_params.vsini[0] != 'NA' and self.global_params.vsini[0] != 'constant':
tot_list_param_title.append(extra_parameters[5][1] + ' ' + extra_parameters[5][2])
theta_index.append('vsini')
if len(self.global_params.ld) > 3: # If you want separate ld for each observations
for indobs, obs in enumerate(sorted(glob.glob(main_obs_path))):
if self.global_params.ld[indobs*3] != 'NA' and self.global_params.ld[indobs*3] != 'constant': # Check if the idobs is different from constant
tot_list_param_title.append(extra_parameters[6][1] + fr'$_{indobs}$' + ' ' + extra_parameters[6][2])
theta_index.append(f'ld_{indobs}')
else: # If you want 1 common vsini for all observations
if self.global_params.ld[0] != 'NA' and self.global_params.ld[0] != 'constant':
tot_list_param_title.append(extra_parameters[6][1] + ' ' + extra_parameters[6][2])
theta_index.append('ld')
# - - - - - - - - - - - - - - - - - - - - -
if self.global_params.av[0] != 'NA' and self.global_params.av[0] != 'constant':
tot_list_param_title.append(extra_parameters[4][1] + ' ' + extra_parameters[4][2])
theta_index.append('av')
## cpd bb
if self.global_params.bb_t[0] != 'NA' and self.global_params.bb_t[0] != 'constant':
tot_list_param_title.append(extra_parameters[7][1] + ' ' + extra_parameters[7][2])
theta_index.append('bb_t')
if self.global_params.bb_r[0] != 'NA' and self.global_params.bb_r[0] != 'constant':
tot_list_param_title.append(extra_parameters[8][1] + ' ' + extra_parameters[8][2])
theta_index.append('bb_r')
self.theta_index = np.asarray(theta_index)
posterior_to_plot = self.samples
if self.global_params.r[0] != 'NA' and self.global_params.r[0] != 'constant':
posterior_to_plot = []
tot_list_param_title.append(r'log(L/L$\mathrm{_{\odot}}$)')
for res, results in enumerate(self.samples):
ind_theta_r = np.where(self.theta_index == 'r')
r_picked = results[ind_theta_r[0]]
lum = np.log10(4 * np.pi * (r_picked * cst.R_jup.value) ** 2 * results[0] ** 4 * cst.sigma_sb.value / cst.L_sun.value)
results = np.concatenate((results, np.asarray(lum)))
posterior_to_plot.append(results)
self.posterior_to_plot = np.array(posterior_to_plot)
self.posteriors_names = tot_list_param_title
def _get_outputs(self, theta):
'''
Function to get the data and best model asociated.
Args:
theta (list): best parameter values
Returns:
- modif_spec_LL list(n-array): list containing the observational dictionary, model flux and their associated scalings
for the spectroscopy and photometry
'''
# Create a list for each outputs (obs and mod) for each observation + scaling factors
modif_spec_LL = []
for indobs, obs in enumerate(sorted(glob.glob(self.global_params.main_observation_path))):
# Recovery of the observational dictionnary
self.global_params.observation_path = obs
obs_name = os.path.splitext(os.path.basename(self.global_params.observation_path))[0]
obs_dict = np.load(os.path.join(self.global_params.result_path, f'spectrum_obs_{obs_name}.npz'), allow_pickle=True)
# Recovery of the spectroscopy and photometry model
path_grid_spectro = os.path.join(self.global_params.adapt_store_path, f'adapted_grid_spectro_{self.global_params.grid_name}_{obs_name}_nonan.nc')
ds_spectro = xr.open_dataset(path_grid_spectro, decode_cf=False, engine='netcdf4')
grid_spectro = ds_spectro['grid']
path_grid_photo = os.path.join(self.global_params.adapt_store_path, f'adapted_grid_photo_{self.global_params.grid_name}_{obs_name}_nonan.nc')
ds_photo = xr.open_dataset(path_grid_photo, decode_cf=False, engine='netcdf4')
grid_photo = ds_photo['grid']
# Emulator (if necessary)
if self.global_params.emulator[0] != 'NA':
# PCA or NMF
mod_dict = dict(np.load(os.path.join(self.global_params.result_path, f'{self.global_params.emulator[0]}_mod_{obs_name}.npz'), allow_pickle=True))
else:
# Standard method
mod_dict = {'wav_spectro': np.asarray(ds_spectro.coords['wavelength']), 'res_spectro': np.asarray(ds_spectro.attrs['res'])}
ds_spectro.close()
ds_photo.close()
# Interpolating the model resolution
if len(obs_dict['wav_spectro']) != 0:
interp_mod_to_obs = interp1d(mod_dict['wav_spectro'], mod_dict['res_spectro'], fill_value='extrapolate') # Interpolate model resolution onto the data
mod_dict['res_spectro'] = interp_mod_to_obs(obs_dict['wav_spectro'])
if self.global_params.par3[0] == 'NA':
if len(obs_dict['wav_spectro']) != 0:
interp_spectro = np.asarray(grid_spectro.interp(par1=theta[0], par2=theta[1],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_spectro = np.asarray([])
if len(obs_dict['wav_photo']) != 0:
interp_photo = np.asarray(grid_photo.interp(par1=theta[0], par2=theta[1],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_photo = np.asarray([])
elif self.global_params.par4[0] == 'NA':
if len(obs_dict['wav_spectro']) != 0:
interp_spectro = np.asarray(grid_spectro.interp(par1=theta[0], par2=theta[1], par3=theta[2],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_spectro = np.asarray([])
if len(obs_dict['wav_photo']) != 0:
interp_photo = np.asarray(grid_photo.interp(par1=theta[0], par2=theta[1], par3=theta[2],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_photo = np.asarray([])
elif self.global_params.par5[0] == 'NA':
if len(obs_dict['wav_spectro']) != 0:
interp_spectro = np.asarray(grid_spectro.interp(par1=theta[0], par2=theta[1], par3=theta[2], par4=theta[3],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_spectro = np.asarray([])
if len(obs_dict['wav_photo']) != 0:
interp_photo = np.asarray(grid_photo.interp(par1=theta[0], par2=theta[1], par3=theta[2], par4=theta[3],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_photo = np.asarray([])
else:
if len(obs_dict['wav_spectro']) != 0:
interp_spectro = np.asarray(grid_spectro.interp(par1=theta[0], par2=theta[1], par3=theta[2], par4=theta[3],
par5=theta[4],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_spectro = np.asarray([])
if len(obs_dict['wav_photo']) != 0:
interp_photo = np.asarray(grid_photo.interp(par1=theta[0], par2=theta[1], par3=theta[2], par4=theta[3],
par5=theta[4],
method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
else:
interp_photo = np.asarray([])
# Recreate the flux array
if self.global_params.emulator[0] != 'NA':
if self.global_params.emulator[0] == 'PCA':
if len(mod_dict['vectors_spectro']) != 0:
flx_mod_spectro = (mod_dict['flx_mean_spectro']+mod_dict['flx_std_spectro'] * (interp_spectro[1:] @ mod_dict['vectors_spectro'])) * interp_spectro[0][np.newaxis]
else:
flx_mod_spectro = np.asarray([])
if len(mod_dict['vectors_photo']) != 0:
flx_mod_photo = (mod_dict['flx_mean_photo']+mod_dict['flx_std_photo'] * (interp_photo[1:] @ mod_dict['vectors_photo'])) * interp_photo[0][np.newaxis]
else:
flx_mod_photo = np.asarray([])
elif self.global_params.emulator[0] == 'NMF':
if len(mod_dict['vectors_spectro']) != 0:
flx_mod_spectro = interp_spectro[:] @ mod_dict['vectors_spectro']
else:
flx_mod_spectro = np.asarray([])
if len(mod_dict['vectors_photo']) != 0:
flx_mod_photo = interp_photo[:] @ mod_dict['vectors_photo']
else:
flx_mod_photo = np.asarray([])
else:
flx_mod_spectro = interp_spectro
flx_mod_photo = interp_photo
# Modification of the synthetic spectrum with the extra-grid parameters
modif_spec_LL.append(modif_spec(self.global_params, theta, self.theta_index,
obs_dict,
flx_mod_spectro, flx_mod_photo,
mod_dict['wav_spectro'], mod_dict['res_spectro'],
indobs=indobs))
return modif_spec_LL
def _get_model_spectrum(self, theta, grid_used='original', wav_bounds=[], res=1000, re_interp=False, indobs=0):
'''
Extract a model spectrum from a grid at a given theta, resolution and wavelength extent.
Args:
theta (list): best parameter values
grid_used (str): (default = 'original') Path to the grid from where to extract the spectrum. If 'original', the current grid will be used.
wav_bounds (list): (default = []) Desired wavelength range. If [] max and min values of the model wavelength range will be use to create the final wavelength range.
res (int): (default = 1000) Spectral resolution (at Nyquist).
re_interp (boolean): (default = False). Option to reinterpolate or not the grid.
int_method (str): (default = "linear") Interpolation method for the grid (if reinterpolated).
Returns:
- wav_final (array): Wavelength array of the full model
- flx_final (array): Flux array of the full model
- res_final (float): Resolution array of the full model
- scale_spectro (float): Spectroscopic flux scaling factor
- scale_photo (float): Photometric flux scaling factor
'''
_, _, _, _, scale_spectro, scale_photo = self._get_outputs(theta)[indobs]
# WARNING : In case of multiple spectra, it is possible to work with different scaling factors. Here we only take the scaling factor of the first spectrum
#in the MOSAIC (used for the plot_fit)
# Recover the original grid
if grid_used == 'original':
path_grid = self.global_params.model_path
else:
path_grid = grid_used
# Recover the original grid
ds = xr.open_dataset(path_grid, decode_cf=False, engine="netcdf4")
N_para = len(ds.coords)-1
# Possibility of re-interpolating holes if the grid contains to much of them (WARNING: Very long process)
if re_interp == True:
print('-> The possible holes in the grid are (re)interpolated: ')
for key_ind, key in enumerate(ds.attrs['key']):
print(str(key_ind+1) + '/' + str(len(ds.attrs['key'])))
ds = ds.interpolate_na(dim=key, method=self.global_params.method, fill_value="extrapolate", limit=None,
max_gap=None)
ds.close()
# Get grid
grid = ds['grid']
wav_mod_nativ = np.asarray(ds['wavelength'])
res_mod_nativ = ds.attrs['res']
# Interpolate the grid
interp_kwargs = {f"par{k+1}": theta[k] for k in range(N_para)}
flx_mod_nativ = np.array(grid.interp(**interp_kwargs, method=self.global_params.method, kwargs={"fill_value": "extrapolate"})) # shape: (variables, pressure)
# Cut the spectrum
if len(wav_bounds) == 0:
wav_bounds = [wav_mod_nativ.min(), wav_mod_nativ.max()] # Use the min and max of the model wavelength range
else:
flx_mod_nativ = flx_mod_nativ[(wav_bounds[0] < wav_mod_nativ) & (wav_mod_nativ < wav_bounds[1])]
res_mod_nativ = res_mod_nativ[(wav_bounds[0] < wav_mod_nativ) & (wav_mod_nativ < wav_bounds[1])]
wav_mod_nativ = wav_mod_nativ[(wav_bounds[0] < wav_mod_nativ) & (wav_mod_nativ < wav_bounds[1])]
# Prepare output wavelengths
wav_mod = [wav_bounds[0]] # Start with the minimum wavelength
dwav = [0]
while wav_mod[-1] < wav_bounds[1]:
dwav_unit = wav_mod[-1] / (2 * res) # Compute spacing (Nyquist sampling)
dwav.append(dwav_unit)
wav_mod.append(wav_mod[-1] + dwav_unit)
wav_mod = np.array(wav_mod)[(wav_mod_nativ[0] < wav_mod) * (wav_mod < wav_mod_nativ[-1])] # Make sure you don't extrapolate
res_mod = np.full(len(wav_mod), res) # Create the resolution array
# Interpolate the resolution array
interp_func = interp1d(wav_mod_nativ, res_mod_nativ, kind='linear', fill_value='extrapolate')
res_mod_nativ = interp_func(wav_mod)
# Decrease the resolution
flx_mod = resolution_decreasing(wav_input=wav_mod_nativ,
flx_input=flx_mod_nativ,
res_input=res_mod_nativ,
wav_output=wav_mod,
res_output=res_mod
)
return wav_mod, flx_mod, res_mod, scale_spectro, scale_photo
def _get_PT_chem(self, theta, path_grid, re_interp_PT_chem=False, re_interp_atm=False):
'''
Function to extract the pressure, temperature and chemical profiles from the PT grid.
This function interpolates the PT grid at the best-fit parameters and computes the brightness temperature of the photosphere.
It also extracts the P/T profile of the photosphere based on the brightness temperature.
This is useful for plotting the P/T profile and the photosphere in the final figure.
Args:
theta (list): best parameter values
path_grid (str): path to the PT grid in xarray format
re_interp_PT_chem (bool): (default = False) Option to reinterpolate the PT grid if it contains holes
re_interp_atm (bool): (default = False) Option to reinterpolate the atmospheric grid if it contains holes
Returns:
- PT_chem (dict): Dictionary containing the pressure, temperature and chemical arrays
- photosphere (dict): Dictionary containing the P/T profile of the photosphere
'''
# P-T / chem grid
ds = xr.open_dataset(path_grid, decode_cf=False, engine='netcdf4')
# Possibility of re-interpolating holes if the grid contains to much of them (WARNING: Very long process)
if re_interp_PT_chem == True:
print('-> The possible holes in the P-T / chem grid are (re)interpolated: ')
for key_ind, key in enumerate(ds.attrs['key']):
print(str(key_ind+1) + '/' + str(len(ds.attrs['key'])))
ds = ds.interpolate_na(dim=key, method=self.global_params.method, fill_value="extrapolate", limit=None,
max_gap=None)
else:
pass
# Extract base parameters
var_names = list(ds.data_vars.keys()) # Save original keys
P = ds.coords['pressure']
# Prepare storage per variable
best_fit_vars = {var: [] for var in var_names}
for j in tqdm(range(len(self.samples)), desc=f"Interpolating P-T / chem grid at theta", unit="sample"):
sample = self.samples[j]
interp_kwargs = {f"par{k+1}": sample[k] for k in range(len(ds.coords)-1)}
interp = ds.interp(**interp_kwargs, method=self.global_params.method, kwargs={"fill_value": "extrapolate"}).to_array()
for v_idx, var in enumerate(var_names):
best_fit_vars[var].append(interp[v_idx].values)
ds.close()
# Convert lists to arrays and compute percentiles
PT_chem = {}
PT_chem["pressure"] = P.values
for var in var_names:
grid = np.array(best_fit_vars[var])
PT_chem[var + '_q2'] = np.percentile(grid, 2, axis=0)
PT_chem[var + '_q16'] = np.percentile(grid, 16, axis=0)
PT_chem[var + '_q50'] = np.percentile(grid, 50, axis=0)
PT_chem[var + '_q84'] = np.percentile(grid, 84, axis=0)
PT_chem[var + '_q98'] = np.percentile(grid, 98, axis=0)
# - - - -
# Extract spectrum for photosphere estimation
ds = xr.open_dataset(self.global_params.model_path, decode_cf=False, engine='netcdf4')
# Possibility of re-interpolating holes if the grid contains to much of them (WARNING: Very long process)
if re_interp_atm == True:
print('-> The possible holes in the atmospheric grid are (re)interpolated: ')
for key_ind, key in enumerate(ds.attrs['key']):
print(str(key_ind+1) + '/' + str(len(ds.attrs['key'])))
ds = ds.interpolate_na(dim=key, method=self.global_params.method, fill_value="extrapolate", limit=None,
max_gap=None)
else:
pass
# Get grid
grid = ds['grid']
wav = grid["wavelength"].values
interp_kwargs = {f"par{k+1}": theta[k] for k in range(len(ds.coords)-1)}
flx = np.array(grid.interp(**interp_kwargs, method=self.global_params.method, kwargs={"fill_value": "extrapolate"}))
ds.close()
# Photosphere range (in µm) and conversion
photosphere_wav = np.asarray([0, 10])
mask = (wav > photosphere_wav[0]) & (wav < photosphere_wav[1])
wav = wav[mask] * 1e-6 # m
flx = flx[mask] * 1e6 # W/m²/m
# Compute brightness temperatures
brightness_temperature = np.zeros_like(wav)
for j in range(len(wav)):
brightness_temperature[j] = (
cst.h.value * cst.c.value / (cst.k_B.value * wav[j]) /
np.log(1 + (2 * cst.h.value * cst.c.value ** 2) /
(wav[j] ** 5 * (flx[j] / np.pi)))
)
brightness_temperature = brightness_temperature[np.isfinite(brightness_temperature)]
T_min = brightness_temperature.min()
T_max = brightness_temperature.max()
# Match to thermal profile
P, T = PT_chem["pressure"], PT_chem["temperature_q50"] # Taking the 'best' profile as reference
mask = (T >= T_min) & (T <= T_max)
# Store photosphere info
photosphere = {
"pressure": P[mask],
"temperature": T[mask],
"brightness_temperature_range": [T_min, T_max]
}
return PT_chem, photosphere
# - - - - - - - -
[docs]
def plot_corner(self, levels_sig=[0.997, 0.95, 0.68], bins=100, quantiles=(0.16, 0.5, 0.84), figsize=(15,15)):
'''
Function to display the corner plot
Args:
levels_sig (list): (default = [0.997, 0.95, 0.68]) 1, 2 and 3 sigma contour levels of the corner plot
bins (int): (default = 100) number of bins for the posteriors
quantiles (list): (default = (0.16, 0.5, 0.84)) mean +- sigma to report the posterior values
burn_in (int): (default = 0) number of steps to remove from the plot
Returns:
- fig (object): matplotlib figure object
'''
print('ForMoSA - Corner plot')
fig = plt.figure(figsize=figsize)
fig = corner.corner(self.posterior_to_plot,
weights=self.weights,
labels=self.posteriors_names,
range=[0.999999 for p in self.posteriors_names],
levels=levels_sig,
bins=bins,
smooth=1,
quantiles=quantiles,
top_ticks=False,
plot_datapoints=False,
plot_density=True,
plot_contours=True,
fill_contours=True,
show_titles=True,
title_fmt='.2f',
title_kwargs=dict(fontsize=14),
contour_kwargs=dict(colors=self.color_out, linewidths=0.7),
pcolor_kwargs=dict(color='red'),
fig=fig,
label_kwargs=dict(fontsize=14))
fig.supxlabel(self.outputs_string, va='top')
return fig
[docs]
def plot_chains(self, figsize=(7,15), show_weights=True):
'''
Plot to check the convergence of the posterior chains.
Multiple (sub-)axis plot.
Args:
figsize (tuple): (default = (7, 15)) size of the plot
show_weights (bool): True or False if you want to show the weights on the chain
Returns:
- fig (object) : matplotlib figure object
- ax (object) : matplotlib axes objects
'''
print('ForMoSA - Posteriors chains for each parameter')
col = int(len(self.posterior_to_plot[0][:])/2)+int(len(self.posterior_to_plot[0][:])%2)
fig, axs = plt.subplots(col, 2, figsize=figsize)
n=0
for i in range(col):
for j in range(2):
axs[i, j].plot(self.posterior_to_plot[:,n], color=self.color_out, alpha=0.8)
axs[i, j].set_ylabel(self.posteriors_names[n])
if show_weights == True:
ax_w = axs[i, j].twinx()
ax_w.plot(self.weights, color='black', alpha=0.5)
ax_w.text(x=0, y=0.00005, s='weights')
if j == 0:
ax_w.set_yticks([])
if self.posteriors_names[n]=='log(L/L$\\mathrm{_{\\odot}}$)':
pass
else:
axs[i, j].axhline(self.theta_best[n],color='k',linestyle='--')
if n == len(self.posteriors_names)-1:
break
else:
n+=1
return fig, axs
[docs]
def plot_radar(self, ranges, label='', quantiles=[0.16, 0.5, 0.84]):
'''
Radar plot to check the distribution of the parameters.
Useful to compare different models.
Args:
ranges (list(tuple)): upper and lower limits for each parameters
label (str): (default = '') label of the plot
quantiles (list): (default = (0.16, 0.5, 0.84)) mean +- sigma to report the posterior values
Returns:
- fig (object) : matplotlib figure object
- radar.ax (object) : matplotlib radar class axes object
'''
print('ForMoSA - Radar plot')
list_posteriors = []
list_uncert_down = []
list_uncert_up = []
for l in range(len(self.posterior_to_plot[1,:])):
q16, q50, q84 = corner.quantile(self.posterior_to_plot[:,l], quantiles)
list_posteriors.append(q50)
list_uncert_down.append(q16)
list_uncert_up.append(q84)
fig = plt.figure(figsize=(6, 6))
radar = ComplexRadar(fig, self.posteriors_names, ranges)
radar.plot(list_posteriors, 'o-', color=self.color_out, label=label)
radar.fill_between(list_uncert_down,list_uncert_up, color=self.color_out, alpha=0.2)
radar.ax.legend(loc='center', bbox_to_anchor=(0.5, -0.20),frameon=False, ncol=2)
return fig, radar.ax
[docs]
def plot_fit(self, figsize=(13, 7), uncert='no', trans='no', logx='no', logy='no', norm='no'):
'''
Plot the best fit comparing with the data.
Args:
figsize (tuple): (default = (10, 5)) Size of the plot
uncert (str): (default = no) 'yes' or 'no' to plot spectra with associated error bars
trans (str): (default = no) 'yes' or 'no' to plot transmision curves for photometry
logx (str): (default = no) 'yes' or 'no' to plot the wavelength in log scale
logy (str): (default = no) 'yes' or 'no' to plot the flux in log scale
norm (str): (default = no) 'yes' or 'no' to plot the normalized spectra
Returns:
- fig (object): matplotlib figure object
- ax (object): matplotlib axes objects, main spectra plot
- axr (object): matplotlib axes objects, residuals
- axr2 (object): matplotlib axes objects, right side density histogram
'''
print('ForMoSA - Best fit and residuals plot')
# Figure setup
fig = plt.figure(figsize=figsize)
fig.tight_layout()
size = (7,11)
ax = plt.subplot2grid(size, (0, 0), rowspan=5, colspan=10)
axr = plt.subplot2grid(size, (5, 0), rowspan=2, colspan=10, sharex=ax)
axr2 = plt.subplot2grid(size, (5, 10), rowspan=2, colspan=1)
# Indices for plot
iobs_spectro = 0
iobs_photo = 0
# Iterate on each obs
for indobs, obs in enumerate(sorted(glob.glob(self.global_params.main_observation_path))):
# Get back spectra
obs_dict, flx_mod_spectro, flx_mod_photo, _, scale_spectro, scale_photo = self._get_outputs(self.theta_best)[indobs]
# Scale or not in absolute flux
if norm != 'yes':
scale_spectro, scale_photo = 1, 1
# Spectroscopic part
if len(obs_dict['wav_spectro']) != 0:
iobs_spectro += 1
iobs_photo += 1
if uncert=='yes':
ax.errorbar(obs_dict['wav_spectro'], obs_dict['flx_spectro']/scale_spectro, yerr=obs_dict['err_spectro']/scale_spectro, c='k', alpha=0.2)
ax.plot(obs_dict['wav_spectro'], obs_dict['flx_spectro']/scale_spectro, c='k')
ax.plot(obs_dict['wav_spectro'], flx_mod_spectro/scale_spectro, c=self.color_out, alpha=0.8)
# Residuals
residuals = obs_dict['flx_spectro'] - flx_mod_spectro
sigma_res = np.nanstd(residuals) # Replace np.std by np.nanstd if nans are in the array to ignore them
axr.plot(obs_dict['wav_spectro'], residuals/sigma_res, c=self.color_out, alpha=0.8)
axr.axhline(y=0, color='k', alpha=0.5, linestyle='--')
axr2.hist(residuals/sigma_res, bins=100, color=self.color_out, alpha=0.5, density=True, orientation='horizontal')
if indobs == iobs_spectro-1:
# Add labels out of the loops
ax.plot(obs_dict['wav_spectro'], np.empty(len(obs_dict['wav_spectro']))*np.nan, c='k', label='Spectroscopic data')
ax.plot(obs_dict['wav_spectro'], np.empty(len(obs_dict['wav_spectro']))*np.nan, c=self.color_out, label='Spectroscopic model')
axr.plot(obs_dict['wav_spectro'], np.empty(len(obs_dict['wav_spectro']))*np.nan, c=self.color_out, label='Spectroscopic data-model')
axr2.hist(residuals/sigma_res, bins=100 , color=self.color_out, alpha=0.2, density=True, orientation='horizontal', label='density')
iobs_spectro = -1
axr2.legend(frameon=False,handlelength=0)
# Photometry part
if len(obs_dict['wav_photo']) != 0:
iobs_photo += 1
iobs_spectro += 1
# If you want to plot the transmission filters
if trans == 'yes':
for pho_ind, pho in enumerate(obs_dict['ins_photo']):
path_list = __file__.split("/")[:-2]
separator = '/'
filter_pho = np.load(separator.join(path_list) + '/phototeque/' + pho + '.npz')
ax.fill_between(filter_pho['x_filt'], filter_pho['y_filt']*0.8*min(obs_dict['flx_photo']/scale_photo),color=self.color_out, alpha=0.3)
ax.text(np.mean(filter_pho['x_filt']), np.mean(filter_pho['y_filt']*0.4*min(obs_dict['flx_photo']/scale_photo)), pho, horizontalalignment='center', c='gray')
if uncert=='yes':
ax.errorbar(obs_dict['wav_photo'], obs_dict['flx_photo']/scale_photo, yerr=obs_dict['err_photo']/scale_photo, c='k', fmt='o', alpha=0.7)
ax.plot(obs_dict['wav_photo'], obs_dict['flx_photo']/scale_photo, 'ko', alpha=0.7)
ax.plot(obs_dict['wav_photo'], flx_mod_photo/scale_photo, 'o', color=self.color_out)
# Residuals
residuals_phot = obs_dict['flx_photo'] - flx_mod_photo
sigma_res = np.std(residuals_phot)
axr.plot(obs_dict['wav_photo'], residuals_phot/sigma_res, 'o', c=self.color_out, alpha=0.8)
axr.axhline(y=0, color='k', alpha=0.5, linestyle='--')
if indobs == iobs_photo-1:
# Add labels out of the loops
ax.plot(obs_dict['wav_photo'], np.empty(len(obs_dict['wav_photo']))*np.nan, 'ko', label='Photometry data')
ax.plot(obs_dict['wav_photo'], np.empty(len(obs_dict['wav_photo']))*np.nan, 'o', c=self.color_out, label='Photometry model')
axr.plot(obs_dict['wav_photo'], np.empty(len(obs_dict['wav_photo']))*np.nan, 'o', c=self.color_out, label='Photometry data-model')
iobs_photo = -1
# Set xlog-scale
if logx == 'yes':
ax.set_xscale('log')
axr.set_xscale('log')
# Set xlog-scale
if logy == 'yes':
ax.set_yscale('log')
# Labels
axr.set_xlabel(r'Wavelength (µm)')
if norm != 'yes':
ax.set_ylabel(r'Flux (W m-2 µm-1)')
else:
ax.set_ylabel(r'Normalised flux (W m-2 µm-1)')
axr.set_ylabel(r'Residuals ($\sigma$)')
axr2.axis('off')
ax.tick_params(labelbottom=False, bottom=False)
ax.legend(frameon=False)
axr.legend(frameon=False)
return fig, ax, axr, axr2
[docs]
def plot_HiRes_comp_model(self, figsize=(10, 5), indobs=0):
'''
Specific function to plot the best fit comparing with the data for high-resolution spectroscopy.
Args:
figsize (tuple): (default = (10, 5)) Size of the plot
indobs (int): Index of the current observation loop
Returns:
- fig1, ax1 (object): matplotlib figure object
- fig, ax (object): matplotlib axes objects
- flx_obs_broadened (array): Flux of the observation with all fitted contributions removed + broadened at vsini
'''
print('ForMoSA - Planet model and data')
# Get back spectra
obs_dict, flx_mod_spectro, _, _, _, _ = self._get_outputs(self.theta_best)[indobs]
# Prepare plot
fig1, ax1 = plt.subplots(1, 1, figsize = figsize)
fig, ax = plt.subplots(1, 1, figsize = figsize)
# Spectroscopic part
if len(obs_dict['wav_spectro']) != 0:
# Remove contributions to show planetary signal if necessary
flx_obs_calib = obs_dict['flx_spectro'] - obs_dict['system'] - obs_dict['star_flx']
flx_mod_calib = flx_mod_spectro- obs_dict['system'] - obs_dict['star_flx']
# Compute intrinsic resolution of the data because of the v.sini (if defined)
try:
if len(self.global_params.vsini) > 4:
new_res = 3.0*1e5 / (self.theta_best[self.theta_index == f'vsini_{indobs}'])
else:
new_res = 3.0*1e5 / (self.theta_best[self.theta_index == 'vsini'])
new_res = new_res * np.ones(len(obs_dict['wav_spectro']))
flx_obs_broadened = resolution_decreasing(obs_dict['wav_spectro'], flx_obs_calib, obs_dict['res_spectro'], obs_dict['wav_spectro'], new_res)
except:
flx_obs_broadened = flx_obs_calib
ax.plot(obs_dict['wav_spectro'], flx_obs_broadened, c='k')
ax.plot(obs_dict['wav_spectro'], flx_mod_calib, c='r')
ax.set_xlabel(r'wavelength ($\mu$m)')
ax.set_ylabel('Flux (ADU)')
ax1.plot(obs_dict['wav_spectro'], flx_obs_calib, c='k')
ax1.plot(obs_dict['wav_spectro'], flx_mod_calib, c = 'r')
if self.global_params.hc_type[indobs % len(self.global_params.hc_type)] != 'NA':
legend_data = 'data - star'
else:
legend_data = 'data'
ax.legend([legend_data, 'planet model'])
ax.tick_params(axis='both')
ax1.legend([legend_data, "planet model"])
ax1.set_xlabel('wavelength ($ \mu $m)')
ax1.tick_params(axis='both')
return fig1, ax1, fig, ax, flx_obs_broadened
[docs]
def compute_ccf_single_rv(self, rv, wav_mod, flx_mod, flx_mod_no_rv, res_mod_obs, wav_obs, flx_obs, res_obs, transm_obs, Sf, indobs):
'''
Compute a cross-correlation coefficient for a single rv. This function is used for the parallelised ccf computation
Args:
rv (float) : rv value to apply to the model
wav_mod (ndarray ) : wavelength grid of the model
flx_mod (ndarray) : flux of the model
flx_mod_no_rv (ndarray) : flx of the model at 0 rv (used for autocorrelation)
res_mod_obs (ndarray) : resolution of the model interpolated onto obs_wav
wav_obs (ndarray) : wavelength grid of the observation
flx_obs (ndarray) : flux of the observation
res_obs (ndarray) : resolution of the observation
transm_obs (ndarray) : atmospheric and instrumental transmission
Sf (float) : L2 norm of the observation
indobs (int) : Index of the current observation loop
Returns:
- ccf (float) : cross-correlation coefficient
- acf (float) : autocorrelation coefficient
- logL (float) : logL value
'''
wav_mod_rv, flx_mod_rv = doppler_fct(wav_mod, flx_mod, rv)
flx_mod_rv = resolution_decreasing(wav_mod_rv, flx_mod_rv, res_mod_obs, wav_obs, res_obs)
flx_cont_mod_rv = continuum_estimate(wav_obs, flx_mod_rv, res_obs, self.global_params.wav_cont[indobs % len(self.global_params.wav_cont)], float(self.global_params.res_cont[indobs % len(self.global_params.res_cont)]))
flx_mod_rv -= flx_cont_mod_rv
flx_mod_rv *= transm_obs
# Normalize the model to make it comparable to the data in terms of flux
flx_mod_rv /= np.sqrt(np.nansum(flx_mod_rv**2))
ccf = np.nansum(flx_mod_rv * flx_obs) # Cross correlation function
acf = np.nansum(flx_mod_rv * flx_mod_no_rv) # Auto correlation function
Sg = np.nansum(np.square(flx_mod_rv))
R = np.nansum(flx_obs * flx_mod_rv)
C2 = R**2 / (Sf * Sg)
logL = -len(flx_obs) / 2 * np.log(1 - C2)
return ccf, acf, logL
[docs]
def plot_ccf(self, rv_grid = [-300,300], rv_step = 0.5, figsize = (10,5), window_normalisation = 100, continuum_res = 500, vsini = [], wav_mod_nativ=[], flx_mod_nativ=[], res_mod_nativ=[], indobs=0, plot=True, map_rv_vsini = False, flx_obs = [], wav_obs = [], res_obs = [], transm_obs = []):
'''
Plot the cross-correlation function. It is used for high resolution spectroscopy.
Args:
figsize (tuple): (default = (10, 5)) Size of the plot
rv_grid (list): (default = [-300,300]) Maximum and minumum values of the radial velocity shift (in km/s)
rv_step (float): (default = 0.5) Radial velocity shift steps (in km/s)
figsize (tuple): (default = (10,5)) Size of the figure to plot
window_normalisation (int): (default = 100) Window used to exclude around the peak of the CCF for noise estimation
vsini (float): (default = []) v.sin(i) used to apply to the model (in the case the user wants to apply another v.sin(i) than the v.sin(i) estimated by the NS)
wav_mod_nativ (array): (default = []) Wavelength of the model to cross-correlate with the data in the case the user wants to use the rv_vsini map function or a different model (individual molecule for example)
flx_mod_nativ (array): (default = []) Flux of the model to cross-correlate with the data in the case the user wants to use the rv_vsini map function or a different model (individual molecule for example)
res_mod_nativ (array): (default = []) Resolution of the model to cross-correlate with the data in the case the user wants to use the rv_vsini map function or a different model (individual molecule for example)
indobs (int): (default = 0) Index of the current observation loop
plot (bool): (default = True) Whether to plot the ccf
map_rv_vsini (bool): (default = False) Whether the user wants to use the rv_vsini map function
flx_obs (array): (default = []) Data in the case the user wants to use the rv_vsini map function. This avoids repeating the same operation for each v.sini defined by the v.sini grid and sames some time
wav_obs (array): (default = []) Wavelength in the case the user wants to use the rv_vsini map function. This avoids repeating the same operation for each v.sini defined by the v.sini grid and sames some time
res_obs (array): (default = []) Resolution in the case the user wants to use the rv_vsini map function. This avoids repeating the same operation for each v.sini defined by the v.sini grid and sames some time
transm_obs (array): (default = []) Transmission in the case the user wants to use the rv_vsini map function. This avoids repeating the same operation for each v.sini defined by the v.sini grid and sames some time
Returns:
- fig1 (object): matplotlib figure object
- ax1 (object): matplotlib axes objects
- rv_grid (list): Radial velocity grid
- ccf_norm (list): Cross-correlation function
- acf_norm (list): Auto-correlation function
'''
print('ForMoSA - CCF plot')
# Gauss function used to estimate the peak of the radial velocity
def gauss(x, a, x0, sigma):
return a*np.exp(-(x-x0)**2/(2*sigma**2))
rv_grid = np.arange(rv_grid[0], rv_grid[1], rv_step)
# This condition is used to retrieve the obseervations and the nativ model.
# In the case the user uses the rv_vsini_map function, the observations and nativ models are already defined as inputs of the function
# This saves some time by avoiding the repetition of this set of operations at each v.sini of the v.sini grid in the rv_vsini_map function
if not(map_rv_vsini):
# In this case, we extract the obs
obs_dict, _, _, _, _, _ = self._get_outputs(self.theta_best)[indobs]
wav_obs, flx_obs, star_flx_obs, system_obs, res_obs, transm_obs = obs_dict['wav_spectro'], obs_dict['flx_spectro'], obs_dict['star_flx'], obs_dict['system'], obs_dict['res_spectro'], obs_dict['transm']
# Retrieve data to cross correlate the model with
flx_obs = flx_obs - star_flx_obs - system_obs # If star_flx and/or system are not define, this won't change anything
# Normalize the data
flx_obs /= np.sqrt(np.sum(flx_obs**2))
# Second step, we retrieve the native model at rv and v.sini = 0
theta_best = np.copy(self.theta_best)
try:
if len(self.global_params.rv) > 3:
theta_best[self.theta_index == f'rv_{indobs}'] = 0
else:
theta_best[self.theta_index == 'rv'] = 0
except:
pass
try:
if len(self.global_params.vsini) > 4:
theta_best[self.theta_index == f'vsini_{indobs}'] = 0
else:
theta_best[self.theta_index == 'vsini'] = 0
except:
pass
# Recover the grid
ds = xr.open_dataset(self.global_params.model_path, decode_cf=False, engine="netcdf4")
wav_mod_nativ = ds["wavelength"].values
res_mod_nativ = np.asarray(ds.attrs['res'])
_, _, _, flx_mod_nativ, _, _ = self._get_outputs(self.theta_best)[indobs]
# This condition arrises if the user does not use the rv_vsini_map function AND does not want to apply a specific vsini to the template
# in that case, we use the best v.sini infered by the nested sampling
if vsini == []:
vsini = self.theta_best[self.theta_index == 'vsini']
# Second.5, interpolate the resolution of the model
interp_mod_to_obs = interp1d(wav_mod_nativ, res_mod_nativ, fill_value='extrapolate')
res_mod_obs = interp_mod_to_obs(wav_obs)
# Third sted, we apply rotational broadening]
flx_mod_vsini, res_mod_vsini = vsini_fct(wav_mod_nativ, flx_mod_nativ, res_mod_obs, 0.6, vsini, self.global_params.vsini[indobs*4 + 3 % len(self.global_params.vsini)]) # We consider the limb darkening to be fixed at 0.6
# Finally, we generate the model at rv = 0 (for auto correlation)
self.global_params.continuum_sub[indobs] = continuum_res
flx_mod_vsini_no_rv = resolution_decreasing(wav_mod_nativ, flx_mod_vsini, res_mod_vsini, wav_obs, res_obs)
flx_cont_mod_vsini_no_rv = continuum_estimate(wav_obs, flx_mod_vsini_no_rv, res_obs, self.global_params.wav_cont[indobs % len(self.global_params.wav_cont)], float(self.global_params.res_cont[indobs % len(self.global_params.res_cont)]))
flx_mod_vsini_no_rv -= flx_cont_mod_vsini_no_rv
# Multiply by telluric and instrumental transmission (if any)
if len(transm_obs > 0):
flx_mod_vsini_no_rv *= transm_obs
flx_mod_vsini_no_rv /= np.sqrt(np.sum(flx_mod_vsini_no_rv**2))
ccf = np.zeros(len(rv_grid))
acf = np.zeros(len(rv_grid))
logL = np.zeros(len(rv_grid))
Sf = np.nansum(np.square(flx_obs))
# compute CCF with pool of workers
with ThreadPool(processes=mp.cpu_count()) as pool:
pbar = tqdm(total=len(rv_grid), leave=False)
def update(*a):
pbar.update()
# Loop in rv
tasks = []
for rv in tqdm(rv_grid):
tasks.append(pool.apply_async(compute_ccf_single_rv, args=(self, rv, wav_mod_nativ, flx_mod_vsini, flx_mod_vsini_no_rv, res_mod_vsini, wav_obs, flx_obs, res_obs, transm_obs, Sf, indobs), callback=update))
pool.close()
pool.join()
# extract results
ccf = np.zeros(rv_grid.size)
acf = np.zeros(rv_grid.size)
logL = np.zeros(rv_grid.size)
for irv, task in enumerate(tasks):
res = task.get()
ccf[irv] = res[0]
acf[irv] = res[1]
logL[irv] = res[2]
# Rescaling cross-correlation function to estimate a SNR
acf_norm = acf - np.median(acf[(np.abs(rv_grid) > window_normalisation)])
ccf_norm = ccf - np.median(ccf[(np.abs(rv_grid-rv_grid[np.argmax(ccf)]) > window_normalisation)])
ccf_noise = np.std(ccf_norm[(np.abs(rv_grid-rv_grid[np.argmax(ccf)]) > window_normalisation)])
ccf_norm = ccf_norm / ccf_noise
#sigma_ccf = sigma_ccf / ccf_noise
if (plot) and (not(map_rv_vsini)):
# Rescaling autocorrelation function to make it comparable with cross-correlation function
acf_norm = acf_norm / np.max(acf_norm) * np.max(ccf_norm)
ind_curve_fit = np.abs(rv_grid - rv_grid[np.argmax(ccf_norm)]) < 15
rv = rv_grid[np.argmax(ccf_norm)]
p0 = [ccf_norm[np.argmax(ccf_norm)], rv, self.theta_best[self.theta_index=='vsini'][0]]
popt, pcov = curve_fit(gauss, rv_grid[ind_curve_fit], ccf_norm[ind_curve_fit], p0=p0)
acf = gauss(rv_grid, popt[0], popt[1], popt[2])
fig1, ax1 = plt.subplots(1,1, figsize=figsize)
ax1.plot(rv_grid, ccf_norm, label = 'ccf')
ax1.plot(rv_grid + popt[1], acf_norm)
ax1.axvline(x = popt[1], linestyle = '--', c='C3')
ax1.set_xlabel('RV (km/s)')
ax1.set_ylabel('S/N')
ax1.legend(['ccf', 'acf'])
print(f'SNR = {np.nanmax(ccf_norm):.1f}, RV = {popt[1]:.1f} km/s')
return fig1, ax1, rv_grid, ccf_norm, acf_norm, ccf_noise, logL
elif (not(plot)) and (not(map_rv_vsini)):
# Rescaling autocorrelation function to make it comparable with cross-correlation function
acf_norm = acf_norm / np.max(acf_norm) * np.max(ccf_norm)
ind_curve_fit = np.abs(rv_grid - rv_grid[np.argmax(ccf_norm)]) < 15
rv = rv_grid[np.argmax(ccf_norm)]
p0 = [ccf_norm[np.argmax(ccf_norm)], rv, self.theta_best[self.theta_index=='vsini'][0]]
popt, pcov = curve_fit(gauss, rv_grid[ind_curve_fit], ccf_norm[ind_curve_fit], p0=p0)
acf = gauss(rv_grid, popt[0], popt[1], popt[2])
print(f'SNR = {np.nanmax(ccf_norm):.1f}, RV = {popt[1]:.1f} km/s')
return rv_grid, ccf_norm, acf_norm, ccf_noise, logL
else:
return rv_grid, logL
[docs]
def plot_map_rv_vsini(self, rv_grid = [-100,100], rv_step = 1.0, vsini_grid=[1,100], vsini_step = 1.0, wav_mod_nativ=[], flx_mod_nativ=[], res_mod_nativ=[], indobs=0, continuum_res=500):
'''
Plot a RV v.sini map. It is used for high resolution spectroscopy
Args:
rv_grid (list): (default = [-100,100]) Maximum and minumum values of the radial velocity shift (in km/s)
rv_step (float): (default = 0.5) Radial velocity shift steps (in km/s)
vsini_grid (list): (default = [1,100]) Maximum and minimum values of the v.sini broadening (in km/s)
vsini_step (float): (default = 1.0) v.sini broadening steps (in km/s)
wav_mod_nativ (array): (default = []) Wavelength of the model to cross-correlate with the data in the case the user wants to use a different model (individual molecule for example)
flx_mod_nativ (array): (default = []) Flux of the model to cross-correlate with the data in the case the user wants to use a different model (individual molecule for example)
res_mod_nativ (array): (default = []) Resolution of the model to cross-correlate with the data in the case the user wants to use a different model (individual molecule for example)
Returns:
- ccf_map (ndarray): 2D cross correlation map of rv and v.sini
- fig (object) : matplotlib figure object
- ax (object) : matplotlib axes objects
'''
print('ForMoSA - RV-vsini mapping plot')
vsini_grid = np.arange(vsini_grid[0], vsini_grid[1], vsini_step)
logL_map = np.empty((len(vsini_grid), int((rv_grid[1] - rv_grid[0]) / rv_step)))
# First step, we retrieve the star and systematics contaminations associated to the best model (if any)
obs_dict, _, _, _, _, _ = self._get_outputs(self.theta_best)[indobs]
wav_obs, flx_obs, star_flx_obs, system_obs, res_obs, transm_obs = obs_dict['wav_spectro'], obs_dict['flx_spectro'], obs_dict['star_flx'], obs_dict['system'], obs_dict['res_spectro'], obs_dict['transm']
# Retrieve data to cross correlate the model with
flx_obs = flx_obs - star_flx_obs - system_obs
# Normalize the data
flx_obs /= np.sqrt(np.sum(flx_obs**2))
# Second step, we retrieve the native model at rv and v.sini = 0
if (len(flx_mod_nativ) == 0) and (len(res_mod_nativ) == 0):
theta_best = np.copy(self.theta_best)
try:
if len(self.global_params.rv) > 3:
theta_best[self.theta_index == f'rv_{indobs}'] = 0
else:
theta_best[self.theta_index == 'rv'] = 0
except:
pass
try:
if len(self.global_params.vsini) > 4:
theta_best[self.theta_index == f'vsini_{indobs}'] = 0
else:
theta_best[self.theta_index == 'vsini'] = 0
except:
pass
# Recover the grid
ds = xr.open_dataset(self.global_params.model_path, decode_cf=False, engine="netcdf4")
wav_mod_nativ = ds["wavelength"].values
res_mod_nativ = np.asarray(ds.attrs['res'])
_, _, _, flx_mod_nativ, _, _ = self._get_outputs(self.theta_best)[indobs]
else:
pass
for i, vsini_i in enumerate(tqdm(vsini_grid)):
grid, logL = self.plot_ccf(rv_grid=rv_grid, rv_step=rv_step, vsini=vsini_i, plot=False, wav_mod_nativ=wav_mod_nativ, flx_mod_nativ=flx_mod_nativ, res_mod_nativ=res_mod_nativ, map_rv_vsini=True, flx_obs=flx_obs, wav_obs=wav_obs, res_obs = res_obs, transm_obs = transm_obs)
logL_map[i] = logL
rv_grid = grid
max_indices = np.unravel_index(np.argmax(logL_map), logL_map.shape)
rv_peak, vsini_peak = rv_grid[max_indices[1]], vsini_grid[max_indices[0]]
fig = plt.figure('rv-vsin(i) map', figsize=(8,5))
ax = fig.add_subplot()
im = ax.imshow(logL_map, cmap=plt.cm.RdBu_r, extent=[rv_grid[0], rv_grid[-1],vsini_grid[0],vsini_grid[-1]])
ax.set_xlabel('RV (km/s)')
ax.set_ylabel('$v\,\sin i$ [km/s]')
ax.axhline(y=vsini_peak, linestyle='--', c='C3')
ax.axvline(x=rv_peak, linestyle='--', c='C3')
ax.set_title(f'RV = {rv_peak:.1f} km/s, $v\,\sin i$ = {vsini_peak:.1f} km/s')
cbar = fig.colorbar(im)
cbar.set_label("logL", fontsize=22, labelpad=10)
return logL_map, fig, ax
[docs]
def plot_PT_chem(self, PT_chem, photosphere={}, par_to_plot=['temperature'], figsize=(10,5)):
'''
Function to plot the Pressure-Temperature profiles and associated vmr/molecular profiles.
Args:
PT_chem (dict): Dictionary containing the pressure, temperature and chemical arrays
photosphere (dict): Dictionary containing the P/T profile of the photosphere
par_to_plot (list(str)): Key list of the parameters from the PT_chem you want to plot
Returns:
- fig (object): matplotlib figure object
- ax (object): matplotlib axes objects
- ax_twin (object): matplotlib axes objects
'''
print('ForMoSA - Pressure-Temperature profile')
# Initialize plot
fig, ax = plt.subplots(1,1, figsize=figsize)
ax_twin = ax.twiny()
# Iterate on each parameter
for i_par, par in enumerate(par_to_plot):
# Temperature
if par == 'temperature':
ax.plot(PT_chem["temperature_q50"], PT_chem["pressure"], color=self.color_out, label='Best-fit')
ax.fill_betweenx(PT_chem["pressure"], PT_chem["temperature_q16"], PT_chem["temperature_q84"], color=self.color_out, alpha=0.1, label=r'2 $\sigma$')
ax.fill_betweenx(PT_chem["pressure"], PT_chem["temperature_q2"], PT_chem["temperature_q98"], color=self.color_out, alpha=0.2, label=r'1 $\sigma$')
else:
ax_twin.plot(PT_chem[par + '_q50'], PT_chem["pressure"], label=par)
# Add photosphere if necessary
if len(photosphere) != 0:
ax.plot(photosphere["temperature"], photosphere["pressure"], color='red', linestyle='--', label='photosphere')
# Plot the legend
# First ax
ax.set_xlabel('Temperature (K)')
ax.set_ylabel('Pressure (bar)')
ax.set_yscale('log')
ax.tick_params(axis='both', which='both')
# Second ax
ax_twin.set_xlabel('abundance/vmr')
ax_twin.set_xscale('log')
# ask matplotlib for the plotted objects and their labels
lines, labels = ax.get_legend_handles_labels()
lines2, labels2 = ax_twin.get_legend_handles_labels()
ax_twin.legend(lines + lines2, labels + labels2)
ax_twin.tick_params(axis='both', which='both')
return fig, ax, ax_twin