Source code for ForMoSA.adapt.adapt_obs_mod

from __future__ import print_function, division
import numpy as np
import os
import glob
import xarray as xr
from scipy.interpolate import interp1d

from ForMoSA.adapt.adapt_grid import adapt_grid
from ForMoSA.adapt.adapt_extraction_functions import adapt_observation

# ----------------------------------------------------------------------------------------------------------------------


[docs] def launch_adapt(global_params, adapt_model=True): """ Adapt the synthetic spectra of a grid to make them comparable with the data. Args: global_params (object): Class containing each parameter adapt_model (bool): True by default to also adapt the grid Returns: None Author: Simon Petrus, Matthieu Ravet, Paulina Palma-Bifani and Allan Denis """ # Get back the grid information from the config file ds = xr.open_dataset(global_params.model_path, decode_cf=False, engine="netcdf4") wav_mod_nativ = ds["wavelength"].values attr = ds.attrs res_mod_nativ = attr['res'] # Check if the grid is Nyquist-sampled, else set the resolution to R = wav / 2 Deltawav to make sure we not are adding any info dwav = np.abs(wav_mod_nativ - np.roll(wav_mod_nativ, 1)) dwav[0] = dwav[1] res_Nyquist = wav_mod_nativ / (2 * dwav) res_mod_nativ[(res_mod_nativ > res_Nyquist)] = res_Nyquist[(res_mod_nativ > res_Nyquist)] # Extract the data from the observation files main_obs_path = global_params.main_observation_path for indobs, obs in enumerate(sorted(glob.glob(main_obs_path))): global_params.observation_path = obs obs_name = os.path.splitext(os.path.basename(global_params.observation_path))[0] obs_dict = adapt_observation(global_params, wav_mod_nativ, res_mod_nativ, obs_name, indobs=indobs) # Check-ups and warnings for negative values in the diagonal of the covariance matrix if len(obs_dict['wav_spectro']) != 0 and any(np.diag(obs_dict['cov']) < 0): print() print("WARNING: Negative value(s) is(are) present on the diagonal of the covariance matrix.") print("Operation aborted.") print() exit() # Save the data np.savez(os.path.join(global_params.result_path, f'spectrum_obs_{obs_name}.npz'), **obs_dict) # - - - - - - - - # Adaptation of the model grid if adapt_model == True: # Creation of the repertory to store the adapted grid (if needed) if os.path.isdir(global_params.adapt_store_path): pass else: os.mkdir(global_params.adapt_store_path) if len(obs_dict['wav_spectro']) != 0: # Setup target wavelength and resolution for the observation and the model if global_params.target_res_mod[indobs % len(global_params.target_res_mod)] == 'mod': # Kepping the model's resolution target_wav_mod = wav_mod_nativ target_res_mod = res_mod_nativ elif global_params.target_res_mod[indobs % len(global_params.target_res_mod)] == 'obs': # Using the observation's resolution except where its higher than the model's target_wav_mod = obs_dict['wav_spectro'] target_res_mod = obs_dict['res_spectro'] else: # Using a custom resolution except where its higher than the model's res_custom = np.full(len(wav_mod_nativ), float(global_params.target_res_mod[indobs])) target_wav_mod = wav_mod_nativ target_res_mod = np.min([res_mod_nativ, res_custom], axis=0) # Masks to have larger cuts of the spectroscopic grid if needed (if rv is defined) if global_params.rv[indobs*3 % len(global_params.rv)] == 'NA': mask_mod_obs = (target_wav_mod <= obs_dict['wav_spectro'][-1]) & (target_wav_mod >= obs_dict['wav_spectro'][0]) target_wav_mod = target_wav_mod[mask_mod_obs] target_res_mod = target_res_mod[mask_mod_obs] else: mask_mod_obs = (target_wav_mod <= 1.01 * obs_dict['wav_spectro'][-1]) & (target_wav_mod >= 0.99 * obs_dict['wav_spectro'][0]) # 1.01 corresponds to a value of 3000 km/s for the RV so we do no risk to lose data on the edges when applying the RV correction target_wav_mod = target_wav_mod[mask_mod_obs] target_res_mod = target_res_mod[mask_mod_obs] # Interpolate the resolution of the model onto the wavelength of the data to properly decrease the resolution if necessary interp_mod_to_obs = interp1d(wav_mod_nativ, res_mod_nativ, fill_value='extrapolate') res_mod_nativ_interp = interp_mod_to_obs(target_wav_mod) else: target_wav_mod = np.asarray([]) target_res_mod = np.asarray([]) res_mod_nativ_interp = np.asarray([]) print() print('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -') print("-> To compare synthetic spectra with the observation we need to manage them. The following actions are performed:") print("- extraction -") print("- resizing on the observation's wavelength range -") print("- adjustement of the spectral resolution -") print("- substraction of the continuum (if needed) -") print() print('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -') print(f"-> Sarting the adaptation of {obs_name}") adapt_grid(global_params, obs_dict, res_mod_nativ_interp, target_wav_mod, target_res_mod, obs_name, indobs)
# ---------------------------------------------------------------------------------------------------------------------- if __name__ == '__main__': from ForMoSA.global_file import GlobFile # USER configuration path print() print('- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -') print('-> Configuration of environment') print('Where is your configuration file?') config_file_path = input() print() # CONFIG_FILE reading and defining global parameters global_params = GlobFile(config_file_path) # To access any param.: global_params.parameter_name launch_adapt(global_params, True)