Source code for ForMoSA.adapt.adapt_extraction_functions
from __future__ import division
import numpy as np
from astropy.io import fits
from scipy.interpolate import interp1d
from ForMoSA.utils_spec import resolution_decreasing, continuum_estimate
[docs]
def adapt_observation(global_params, wav_mod_nativ, res_mod_nativ, obs_name, indobs=0):
"""
Take back the extracted data spectrum from the function 'extract_observation', decrease its spectral
resolution and remove the continuum if necessary
Args:
global_params (object): Class containing each parameter
wav_mod_nativ (array): Wavelength grid of the model
res_mod_nativ (array): Spectral resolution of the model
obs_name (str): Name of the current observation looping
indobs (int): Index of the current observation looping
Returns:
- obs_dict (dict): Dictionay containing all the observationnal entries (photometry, spectroscopy and/or optional)
Author: Simon Petrus, Matthieu Ravet
"""
# Extract the wavelengths, flux, errors, spectral resolution, and instrument/filter names from the observation file.
obs_dict = extract_observation(global_params, indobs=indobs)
# Decrease the resolution and remove the continuum if necessary
if len(obs_dict['wav_spectro']) != 0:
# - - - - - -
# Setup target resolution for the observation
# Interpolate the resolution of the model onto the wavelength of the data to properly decrease the resolution if necessary
interp_mod_to_obs = interp1d(wav_mod_nativ, res_mod_nativ, fill_value='extrapolate')
res_mod_obs = interp_mod_to_obs(obs_dict['wav_spectro'])
if global_params.target_res_obs[indobs % len(global_params.target_res_obs)] == 'obs': # Keeping the resolution of the observation except where its higher than the model's
target_res_obs = np.min([obs_dict['res_spectro'], res_mod_obs], axis=0)
else: # Using a custom resolution except where its higher than the model's or the observation's
res_custom = np.full(len(obs_dict['res_spectro']), float(global_params.target_res_obs[indobs % len(global_params.target_res_obs)]))
target_res_obs = np.min([obs_dict['res_spectro'], res_mod_obs, res_custom], axis=0)
# - - - - - -
# If we want to decrease the resolution of the spectroscopic data:
obs_dict['flx_spectro'] = resolution_decreasing(obs_dict['wav_spectro'],
obs_dict['flx_spectro'],
obs_dict['res_spectro'],
obs_dict['wav_spectro'],
target_res_obs)
obs_dict['transm'] = resolution_decreasing(obs_dict['wav_spectro'],
obs_dict['transm'],
obs_dict['res_spectro'],
obs_dict['wav_spectro'],
target_res_obs)
obs_dict['star_flx'] = np.asarray([resolution_decreasing(obs_dict['wav_spectro'],
obs_dict['star_flx'][:,i],
obs_dict['res_spectro'],
obs_dict['wav_spectro'],
target_res_obs)
for i in range(obs_dict['star_flx'].shape[-1])]).T
obs_dict['system'] = np.asarray([resolution_decreasing(obs_dict['wav_spectro'],
obs_dict['system'][:,i],
obs_dict['res_spectro'],
obs_dict['wav_spectro'],
target_res_obs)
for i in range(obs_dict['system'].shape[-1])]).T
# Since the resolution of the observation might have change, we need to save the new one
obs_dict['res_spectro'] = target_res_obs
# Compute the inverse covariance and log-determinant (if necessary)
if len(obs_dict['cov']) != 0:
obs_dict['inv_cov'] = np.linalg.inv(obs_dict['cov']) # Save the inverse covariance to speed up the inversion
else:
pass
# If we want to estimate and substract the continuum of the data:
if global_params.res_cont[indobs % len(global_params.res_cont)] != 'NA':
print()
print(obs_name + ' will have a R=' + global_params.res_cont[indobs % len(global_params.res_cont)] + ' continuum removed using a '
+ global_params.wav_cont[indobs % len(global_params.wav_cont)] + ' wavelength range')
print()
obs_dict['flx_spectro_cont'] = continuum_estimate(obs_dict['wav_spectro'],
obs_dict['flx_spectro'],
obs_dict['res_spectro'],
global_params.wav_cont[indobs % len(global_params.wav_cont)],
float(global_params.res_cont[indobs % len(global_params.res_cont)]))
# If you don't use hc models, the data continuum is directly removed
if global_params.hc_type[indobs % len(global_params.hc_type)] == 'NA':
obs_dict['flx_spectro'] -= obs_dict['flx_spectro_cont']
else: # If you use hc models, the data is kept; we just need to estimate the continuum of the star flux as well
obs_dict['star_flx_cont'] = continuum_estimate(obs_dict['wav_spectro'],
obs_dict['star_flx'][:,len(obs_dict['star_flx'][0]) // 2], # Continuum of the star on the central pixel
obs_dict['res_spectro'],
global_params.wav_cont[indobs % len(global_params.wav_cont)],
float(global_params.res_cont[indobs % len(global_params.res_cont)]))
return obs_dict
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def extract_observation(global_params, indobs=0):
"""
Extract the information from the observation file, including the wavelengths (um - vacuum), flux (W.m-2.um.1), errors (W.m-2.um.1), covariance (W.m-2.um.1)**2, spectral resolution,
instrument/filter name, transmission (Atmo+inst) and star flux (W.m-2.um.1). The wavelength range is define by the parameter "wav_for_adapt".
Args:
global_params (object): Class containing each parameter
obs_name (str): Name of the current observation looping
indobs (int): Index of the current observation looping
Returns:
- obs_dict (dict): Dictionay containing all the observationnal entries (photometry, spectroscopy and/or optional)
Author: Simon Petrus, Matthieu Ravet and Allan Denis
"""
# Extraction
with fits.open(global_params.observation_path) as hdul:
# Check the format of the file and extract data accordingly
wav = hdul[1].data['WAV']
flx = hdul[1].data['FLX']
res = hdul[1].data['RES']
ins = hdul[1].data['INS']
try: # Check for spectral covariances
err = hdul[1].data['ERR']
cov = np.asarray([]) # Create an empty covariance matrix if not already present in the data (to not slow the inversion)
except:
cov = hdul[1].data['COV']
err = np.sqrt(np.diag(np.abs(cov)))
try: # Check for transmission
transm = hdul[1].data['TRANSM']
except:
transm = np.asarray([])
try: # Check for star flux
star_flx = hdul[1].data['STAR_FLX1'][:,np.newaxis]
is_star = True
except:
star_flx = np.asarray([])
is_star = False
if is_star:
i = 2
while True: # In case there is multiple star flux (usually shifted to account for the PSF)
try:
star_flx = np.concatenate((star_flx, hdul[1].data['STAR_FLX' + str(i)][:,np.newaxis]),axis=1)
i += 1
except:
break
try:
is_system = True
system = hdul[1].data['SYSTEMATICS1'][:,np.newaxis]
except:
is_system = False
system = np.asarray([])
if is_system:
i = 2
while True: # In case there is multiple systematics
try:
system = np.concatenate((system, hdul[1].data['SYSTEMATICS' + str(i)][:,np.newaxis]),axis=1)
i += 1
except:
break
# Only take the covariance if you use the chi2_covariance likelihood function (will need to be change when new likelihood functions using the
# covariance matrix will come)
if global_params.logL_type[indobs % len(global_params.logL_type)] != 'chi2_covariance' and global_params.logL_type[indobs % len(global_params.logL_type)] != 'chi2_noisescaling_covariance':
cov = np.asarray([])
# Filter the NaN and inf values
nan_mod_ind = (~np.isnan(flx)) & (~np.isnan(err)) & (np.isfinite(flx)) & (np.isfinite(err))
if len(cov) != 0:
nan_mod_ind = (nan_mod_ind) & np.all(~np.isnan(cov), axis=0) & np.all(~np.isnan(cov), axis=1) & np.all(np.isfinite(cov), axis=0) & np.all(np.isfinite(cov), axis=1)
if len(transm) != 0:
nan_mod_ind = (nan_mod_ind) & (~np.isnan(transm)) & (np.isfinite(transm))
if len(star_flx) != 0:
for i in range(len(star_flx[0])):
nan_mod_ind = (nan_mod_ind) & (~np.isnan(star_flx.T[i])) & (np.isfinite(star_flx.T[i]))
if len(system) != 0:
for i in range(len(system[0])):
nan_mod_ind = (nan_mod_ind) & (~np.isnan(system.T[i])) & (np.isfinite(system.T[i]))
wav = wav[nan_mod_ind]
flx = flx[nan_mod_ind]
res = res[nan_mod_ind]
ins = ins[nan_mod_ind]
err = err[nan_mod_ind]
if len(cov) != 0:
cov = cov[np.ix_(nan_mod_ind, nan_mod_ind)]
if len(transm) != 0 and len(star_flx) != 0:
transm = transm[nan_mod_ind]
if len(star_flx) != 0:
star_flx = np.delete(star_flx, np.where(~nan_mod_ind), axis=0)
if len(system) != 0:
system = np.delete(system, np.where(~nan_mod_ind), axis=0)
# - - - - - - - - -
# Separate photometry from spectroscopy
mask_photo = (res == 0.0)
wav_photo, wav_spectro = wav[mask_photo], wav[~mask_photo]
flx_photo, flx_spectro = flx[mask_photo], flx[~mask_photo]
ins_photo, res_spectro = ins[mask_photo], res[~mask_photo]
err_phot, err_spectro = err[mask_photo], err[~mask_photo]
if len(cov) != 0:
cov = cov[np.ix_(~mask_photo, ~mask_photo)]
if len(transm) != 0 and len(star_flx) != 0:
transm = transm[~mask_photo]
if len(star_flx) != 0:
star_flx = np.delete(star_flx, np.where(mask_photo), axis=0)
if len(system) != 0:
system = np.delete(system, np.where(mask_photo), axis=0)
# - - - - - - - - -
# Observation dictionary
obs_dict = {'wav_photo': wav_photo, # Photometry part
'flx_photo': flx_photo,
'err_photo': err_phot,
'ins_photo': ins_photo,
'wav_spectro': wav_spectro, # Spectroscopy part
'flx_spectro': flx_spectro,
'err_spectro': err_spectro,
'res_spectro': res_spectro,
'cov': cov,
'transm': transm,
'star_flx': star_flx,
'system': system
}
return obs_dict
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def adapt_model(global_params, obs_dict, wav_mod_nativ, flx_mod_nativ, res_mod_nativ, target_wav_mod, target_res_mod, indobs=0):
"""
Extracts a synthetic spectrum from a grid and decreases its spectral resolution. The photometry points are
calculated too.
Args:
global_params (object): Class containing each parameter used in ForMoSA
obs_dict (dict): Dictionay containing all the observationnal entries (photometry, spectroscopy and/or optional)
wav_mod_nativ (array): Native wavelength grid of the model
flx_mod_nativ (array): Flux of the model
res_mod_nativ (array): Spectral resolution of the model
target_wav_mod (array): Targeted wavelength grid of the final grid
target_res_mod (array): Targeted spectral resolution of the final grid
indobs (int): Index of the current observation looping
Returns:
- mod_spectro (array): List containing the sub-spectra defined by the parameter "wav_for_adapt".
- mod_photo (array): List containing the photometry ('0' replace the spectral resolution here).
Author: Simon Petrus, Matthieu Ravet
"""
# Spectroscopy part
mod_spectro = np.empty(len(target_wav_mod), dtype=float)
# If we want to decrease the resolution of the model (if necessary):
if len(obs_dict['wav_spectro']) != 0:
mod_spectro = resolution_decreasing(wav_mod_nativ,
flx_mod_nativ,
res_mod_nativ,
target_wav_mod,
target_res_mod)
# If we want to estimate and substract the continuum of the data (except for high contrast where we need to keeo the og spectrum):
if global_params.res_cont[indobs % len(global_params.res_cont)] != 'NA' and global_params.hc_type[indobs % len(global_params.hc_type)] == 'NA':
mod_spectro -= continuum_estimate(target_wav_mod,
mod_spectro,
target_res_mod,
global_params.wav_cont[indobs % len(global_params.wav_cont)],
float(global_params.res_cont[indobs % len(global_params.res_cont)]))
# Photometry part
mod_photo = np.empty(len(obs_dict['wav_photo']), dtype=float)
if len(obs_dict['wav_photo']) != 0:
# Calculate each photometry point (if necessary)
for pho_ind, pho in enumerate(obs_dict['ins_photo']):
path_list = __file__.split("/")[:-2]
separator = '/'
filter_pho = np.load(separator.join(path_list) + '/phototeque/' + pho + '.npz')
x_filt = filter_pho['x_filt']
y_filt = filter_pho['y_filt']
filter_interp = interp1d(x_filt, y_filt, fill_value="extrapolate")
y_filt = filter_interp(wav_mod_nativ)
ind = np.where(np.logical_and(wav_mod_nativ > min(x_filt), wav_mod_nativ < max(x_filt)))
flx_filt = np.sum(flx_mod_nativ[ind] * y_filt[ind] * (wav_mod_nativ[ind][1] - wav_mod_nativ[ind][0]))
y_filt_tot = np.sum(y_filt[ind] * (wav_mod_nativ[ind][1] - wav_mod_nativ[ind][0]))
flx_filt = flx_filt / y_filt_tot
mod_photo[pho_ind] = flx_filt
return mod_spectro, mod_photo