import numpy as np
from scipy.ndimage import gaussian_filter
from scipy.interpolate import interp1d
import extinction
import astropy.units as u
import astropy.constants as const
from PyAstronomy.pyasl import rotBroad, fastRotBroad
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
from tqdm import tqdm
from ForMoSA.core.enums import VsiniFunction
from scipy import optimize
[docs]
def calc_ck(flx_mod: np.ndarray, flx_obs: np.ndarray, err_obs: np.ndarray, r_picked: float, d_picked: float, analytic: str = 'no', bounds: tuple[float, float] = (-float('inf'), float('inf'))) -> tuple[np.ndarray, float]:
'''
Calculation of the dilution factor Ck and re-normalization of the synthetic spectrum.
Parameters
----------
flx_mod : np.ndarray
Flux of the model spectrum
flx_obs : np.ndarray
Flux of the data
err_obs : np.ndarray
Error of the data
r_picked : float
Radius (Rjup)
d_picked : float
Distance (pc)
analytic : str
If 'yes', compute Ck by linear least-squares fitting, else use alpha*(R/d)^2
bounds : tuple[float, float]
Bounds for the Least Squares
Returns
-------
flx_mod_ck : np.ndarray
Scaled model flux
ck : float
Scaling coefficient
Notes
-----
Authors: Simon Petrus and Allan Denis
'''
# Fixed analytical scaling
if analytic == 'no':
r_picked *= u.Rjup
d_picked *= u.pc
ck = (r_picked.to(u.m).value/d_picked.to(u.m).value)**2
flx_mod_ck = flx_mod * ck
else:
# Fit mode using fit_linear_model
A = [flx_mod]
b = flx_obs
# Solve via fit_linear_model
result = fit_linear_model(components=A, flx_obs=b, err_obs=err_obs, bounds=bounds)
flx_mod_ck = result['model']
ck = result['coeffs'][0]
return flx_mod_ck, ck
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def convolve_and_sample(wv_channels: list, sigmas_wvs: list, model_wvs: np.ndarray, model_fluxes: np.ndarray, num_sigma: int=3, force_int: bool=True) -> np.ndarray: # num_sigma = 3 is a good compromise between sampling enough the gaussian and fast interpolation
"""
Simulate the observations of a model. Convolves the model with a variable Gaussian LSF, sampled at each desired
spectral channel.
Parameters
----------
wv_channels : list(floats)
the wavelengths values desired
sigmas_wvs : list(floats)
the LSF gaussian standard deviation of each wv_channels [IN UNITS OF model_wvs]
model_wvs : array
the wavelengths of the model
model_fluxes : array
the fluxes of the model
num_sigma : int
number of +/- sigmas to evaluate the LSF to.
force_int : bolean
False by default. If True, will force interpolation onto wv_channels when the kernel is singular
Returns
-------
array
the fluxes in each of the wavelength channels
Notes
-----
Authors: Jason Wang
"""
model_in_range = np.where((model_wvs >= np.min(wv_channels)) & (model_wvs < np.max(wv_channels)))
dwv_model = np.abs(model_wvs[model_in_range] - np.roll(model_wvs[model_in_range], 1))
dwv_model[0] = dwv_model[1]
filter_size = int(np.ceil(np.max((2 * num_sigma * sigmas_wvs) / np.min(dwv_model))))
filter_coords = np.linspace(-num_sigma, num_sigma, filter_size)
filter_coords = np.tile(filter_coords, [wv_channels.shape[0], 1]) # shape of (N_output, filter_size)
filter_wv_coords = filter_coords * sigmas_wvs[:, None] + wv_channels[:, None] # model wavelengths we want
lsf = np.exp(-filter_coords ** 2 / 2) / np.sqrt(2 * np.pi)
left_fill = model_fluxes[model_in_range][0]
right_fill = model_fluxes[model_in_range][-1]
model_interp = interp1d(model_wvs, model_fluxes, kind='cubic', bounds_error=False, fill_value=(left_fill,right_fill))
if np.sum(lsf) != 0:
filter_model = model_interp(filter_wv_coords)
output_model = np.nansum(filter_model * lsf, axis=1) / np.sum(lsf, axis=1)
else:
if force_int == True:
output_model = model_interp(wv_channels)
else:
output_model = model_fluxes
return output_model
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def resolution_decreasing(wav_input: np.ndarray, flx_input: np.ndarray, res_input: np.ndarray, wav_output: np.ndarray, res_output: np.ndarray) -> np.ndarray:
"""
Decrease the resolution of a spectrum by convolving with a Gaussian kernel.
Parameters
----------
wav_input : array
Wavelength grid of the input
flx_input : array
Flux of the input
res_input : array
Spectral resolution at wav_input
wav_output : array
Desired output wavelength grid
res_output : array
Spectral resolution at wav_output
Returns
-------
array
Flux at lower resolution, resampled to wav_output
Notes
-----
Authors: Simon Petrus
"""
if len(flx_input) == 0 or len(wav_input) == 0:
return np.array([])
# Interpolation to common resolution
# Use edge fill values so that observation points just outside the (RV-trimmed) model
# range get the nearest edge resolution rather than NaN, which would propagate through
# fwhm_conv -> sigma_conv and crash convolve_and_sample.
res_in_interp = interp1d(wav_input, res_input, kind='linear', bounds_error=False,
fill_value=(res_input[0], res_input[-1]))
res_out_interp = interp1d(wav_output, res_output, kind='linear', bounds_error=False,
fill_value=(res_output[0], res_output[-1]))
res_in = res_in_interp(wav_output)
res_out = res_out_interp(wav_output)
# FWHM and convolution width
fwhm_in = wav_output / res_in
fwhm_out = wav_output / res_out
fwhm_conv = np.sqrt(np.maximum(fwhm_out**2 - fwhm_in**2, 0.0)) # avoid sqrt of negative
sigma_conv = fwhm_conv / 2.355 # convert FWHM to sigma
# Apply convolution
flx_output = convolve_and_sample(wav_output, sigma_conv, wav_input, flx_input, force_int=True)
return flx_output
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def continuum_estimate(wav_input: np.ndarray, flx_input: np.ndarray, res_input: np.ndarray, wav_cont_bounds: str, res_cont: float) -> np.ndarray:
"""
Decrease the resolution of a spectrum (data or model). The function calculates the FWHM as a function of the
wavelengths of the custom spectral resolution (estimated for the continuum). It then calculates a sigma to decrease
the resolution of the spectrum to this custom FWHM for each wavelength using a gaussian filter and resample it on
the wavelength grid of the data.
Parameters
----------
wav_input : np.ndarray
Wavelength grid of the spectrum for which you want to estimate the continuum
flx_input : np.ndarray
Flux of the spectrum for which you want to estimate the continuum
res_input : np.ndarray
Spectral resolution of the spectrum for which you want to estimate the continuum
wav_cont_bounds : str
Wavelength bounds where you want to estimate the continuum
res_cont : float
Approximate resolution of the continuum
Returns
-------
np.ndarray
Estimated continuum of the spectrum re-sampled on the data wavelength grid
Notes
-----
Authors: Simon Petrus, Matthieu Ravet
"""
# Initialize
flx_cont = np.asarray([])
wav_cont = np.asarray([])
# Redifine a spectrum only composed by the wavelength ranges used to estimate the continuum
for wav_cont_cut in wav_cont_bounds.split('/'):
wav_cont_cut = wav_cont_cut.split(',')
ind_cont_cut = np.where((float(wav_cont_cut[0]) <= wav_input) & (wav_input <= float(wav_cont_cut[1])))
# To limit the computing time, the convolution is not as a function of the wavelength but calculated
# from the median wavelength. We just want an estimate of the continuum here.
wav_median = np.median(wav_input[ind_cont_cut])
dwav_median = np.median(np.abs(wav_input[ind_cont_cut] - np.roll(wav_input[ind_cont_cut], 1))) # Estimated the median wavelength separation instead of taking wav_median - (wav_median+1) that could be on a border
fwhm = wav_median / np.median(res_input)
fwhm_continuum = wav_median / float(res_cont)
fwhm_conv = np.sqrt(fwhm_continuum**2 - fwhm**2)
sigma = fwhm_conv / (dwav_median * 2.355)
cont = gaussian_filter(flx_input[ind_cont_cut], sigma)
# Concatenate everything
wav_cont = np.concatenate((wav_cont, wav_input[ind_cont_cut]))
flx_cont = np.concatenate((flx_cont, cont))
# Reinterpolate onto the original wavelength grid
continuum_interp = interp1d(wav_cont, flx_cont, kind='linear', fill_value = 'extrapolate')
continuum = continuum_interp(wav_input)
return continuum
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def doppler_fct(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, res_mod_spectro: np.ndarray, rv_picked: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Application of a Doppler shifting to the interpolated synthetic spectrum using the function pyasl.dopplerShift.
The side effects of the Doppler shifting are taking into account by using a model interpolated on a larger wavelength grid as the wavelength grid of the data.
After the Doppler shifting, the model is then cut to the wavelength of the data.
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum
res_mod_spectro : array
Resolution of the model
rv_picked : float
Radial velocity randomly picked by the nested sampling (in km.s-1)
Returns
-------
array
Wavelength grid after Doppler shifting
array
New flux of the interpolated synthetic spectrum
array
New resolution of the interpolated synthetic spectrum
Notes
-----
Authors: Simon Petrus, Allan Denis and Matthieu Ravet
"""
if len(flx_mod_spectro) != 0:
new_wav = wav_mod_spectro * ((rv_picked / const.c.to(u.km/u.s).value) + 1)
rv_interp = interp1d(new_wav, flx_mod_spectro, bounds_error=False)
flx_post_doppler = rv_interp(wav_mod_spectro)
# Remove the nans caused by the RV correction
# Note: this step is not problematic as the wavelength range of the model is slightly larger than the wavelength range of the data
# so we do not lose any data in the model within the wavelength range of the data
nans = np.where(~np.isnan(flx_post_doppler))[0]
wav_post_doppler, flx_post_doppler = wav_mod_spectro[nans], flx_post_doppler[nans]
res_post_doppler = res_mod_spectro[nans]
else:
wav_post_doppler, flx_post_doppler, res_post_doppler = wav_mod_spectro, flx_mod_spectro, res_mod_spectro
return wav_post_doppler, flx_post_doppler, res_post_doppler
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def reddening_fct(wav: np.ndarray, flx: np.ndarray, av_picked: float) -> tuple[np.ndarray, np.ndarray]:
"""
Application of a sythetic interstellar extinction to the interpolated synthetic spectrum using the function
extinction.fm07.
Parameters
----------
wav : array
Wavelength grid of the model
flx : array
Flux of the interpolated synthetic spectrum
av_picked : float
Extinction randomly picked by the nested sampling (in mag)
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Simon Petrus
"""
if len(flx) != 0:
dered_merge = extinction.fm07(wav * 10000, av_picked, unit='aa')
flx_rd = flx * 10**(-0.4*dered_merge)
else:
flx_rd = flx
return flx_rd
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def vsini_fct(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, res_mod_obs_spectro: np.ndarray, ld_picked: float, vsini_picked: float, vsini_type: str) -> tuple[np.ndarray, np.ndarray]:
"""
Application of a rotational velocity (line broadening) to the interpolated synthetic spectrum
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum (spectroscopy)
res_mod_obs_spectro : array
Resolution of the model as a function of the wavelength grid of the data
ld_picked : float
Limb darkening randomly picked by the nested sampling
vsini_picked : float
v.sin(i) randomly picked by the nested sampling (in km.s-1)
vsini_type : str
Vsin(i) function to use
Returns
-------
array
New flux of the broadened synthetic spectrum (spectroscopy)
array
New resolution of the broadened synthetic spectrum (photometry)
Notes
-----
Authors: Allan Denis
"""
if len(flx_mod_spectro) != 0:
if vsini_picked != 0:
if vsini_type == VsiniFunction.RotBroad.value:
flx_mod_spectro_broad = vsini_fct_rot_broad(wav_mod_spectro, flx_mod_spectro, ld_picked, vsini_picked)
elif vsini_type == VsiniFunction.FastRotBroad.value:
flx_mod_spectro_broad = vsini_fct_fast_rot_broad(wav_mod_spectro, flx_mod_spectro, ld_picked, vsini_picked)
elif vsini_type == VsiniFunction.Accurate.value:
flx_mod_spectro_broad = vsini_fct_accurate(wav_mod_spectro, flx_mod_spectro, ld_picked, vsini_picked)
elif vsini_type == VsiniFunction.AccurateFast.value:
flx_mod_spectro_broad = vsini_fct_accurate_fast_rot_broad(wav_mod_spectro, flx_mod_spectro, ld_picked, vsini_picked)
else:
raise ValueError(f'Unknow rotational broadening method {vsini_type}')
# Because of the v.sini correction, the resolution of the model has been downgraded, so we update it
res_mod_obs_spectro_broad = const.c.to('km/s').value / vsini_picked * np.ones(len(res_mod_obs_spectro))
else: # vsini_picked is 0
res_mod_obs_spectro_broad, flx_mod_spectro_broad = res_mod_obs_spectro, flx_mod_spectro
else:
flx_mod_spectro_broad, res_mod_obs_spectro_broad = flx_mod_spectro, res_mod_obs_spectro
return flx_mod_spectro_broad, res_mod_obs_spectro_broad
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def vsini_fct_rot_broad(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, ld_picked: np.ndarray, vsini_picked: float) -> np.ndarray:
"""
Application of a rotation velocity (line broadening) to the interpolated synthetic spectrum using the function
extinction.fm07.
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum
ld_picked : float
Limb darkening randomly picked by the nested sampling
vsini_picked : float
v.sin(i) randomly picked by the nested sampling (in km.s-1)
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Simon Petrus
"""
# Correct irregulatities in the wavelength grid
wav_interval = wav_mod_spectro[1:] - wav_mod_spectro[:-1]
wav_to_vsini = np.arange(min(wav_mod_spectro), max(wav_mod_spectro), min(wav_interval) * 2/3)
vsini_interp = interp1d(wav_mod_spectro, flx_mod_spectro, fill_value="extrapolate")
flx_to_vsini = vsini_interp(wav_to_vsini)
# Apply the v.sin(i)
new_flx = rotBroad(wav_to_vsini, flx_to_vsini, ld_picked, vsini_picked)
vsini_interp = interp1d(wav_to_vsini, new_flx, fill_value="extrapolate")
flx_mod_spectro_broad = vsini_interp(wav_mod_spectro)
return flx_mod_spectro_broad
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def vsini_fct_fast_rot_broad(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, ld_picked: np.ndarray, vsini_picked: float) -> np.ndarray:
"""
Application of a rotation velocity (line broadening) to the interpolated synthetic spectrum using the function
extinction.fm07.
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum
ld_picked : float
Limb darkening randomly picked by the nested sampling
vsini_picked : float
v.sin(i) randomly picked by the nested sampling (in km.s-1)
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Simon Petrus
"""
# Correct irregulatities in the wavelength grid
wav_interval = wav_mod_spectro[1:] - wav_mod_spectro[:-1]
wav_to_vsini = np.arange(min(wav_mod_spectro), max(wav_mod_spectro), min(wav_interval) * 2/3)
vsini_interp = interp1d(wav_mod_spectro, flx_mod_spectro, fill_value="extrapolate")
flx_to_vsini = vsini_interp(wav_to_vsini)
# Apply the v.sin(i)
new_flx = fastRotBroad(wav_to_vsini, flx_to_vsini, ld_picked, vsini_picked)
vsini_interp = interp1d(wav_to_vsini, new_flx, fill_value="extrapolate")
flx_mod_spectro_broad = vsini_interp(wav_mod_spectro)
return flx_mod_spectro_broad
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def vsini_fct_accurate(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, ld_picked: np.ndarray, vsini_picked: np.ndarray, nr: int=50, ntheta: int=100, dif: float=0.0) -> np.ndarray:
'''
A routine to quickly rotationally broaden a spectrum in linear time.
Adapted from Carvalho & Johns-Krull 2023 https://ui.adsabs.harvard.edu/abs/2023RNAAS...7...91C/abstract
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum
ld_picked : float
Limb darkening randomly picked by the nested sampling
vsini_picked : float
v.sin(i) randomly picked by the nested sampling (in km.s-1)
nr : int
(default = 10) The number of radial bins on the projected disk
ntheta : int
(default = 100) The number of azimuthal bins in the largest radial annulus
note: the number of bins at each r is int(r*ntheta) where r < 1
dif : float
(default = 0) The differential rotation coefficient, applied according to the law Omeg(th)/Omeg(eq) = (1 - dif/2 - (dif/2) cos(2 th)).
Dif = .675 nicely reproduces the law proposed by Smith, 1994, A&A, Vol. 287, p. 523-534, to unify WTTS and CTTS.
Dif = .23 is similar to observed solar differential rotation. Note: the th in the above expression is the stellar co-latitude, not the same as the integration variable used below.
This is a disk integration routine.
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Allan Denis
'''
ns = np.copy(flx_mod_spectro)*0.0
tarea = 0.0
dr = 1./nr
for j in range(0, nr):
r = dr/2.0 + j*dr
area = ((r + dr/2.0)**2 - (r - dr/2.0)**2)/int(ntheta*r) * (1.0 - ld_picked + ld_picked * np.cos(np.arcsin(r)))
for k in range(0,int(ntheta*r)):
th = np.pi/int(ntheta*r) + k * 2.0*np.pi/int(ntheta*r)
if dif != 0:
vl = vsini_picked * r * np.sin(th) * (1.0 - dif/2.0 - dif/2.0*np.cos(2.0*np.arccos(r*np.cos(th))))
ns += area * np.interp(wav_mod_spectro + wav_mod_spectro*vl/const.c.to(u.km/u.s).value, wav_mod_spectro, flx_mod_spectro)
tarea += area
else:
vl = r * vsini_picked * np.sin(th)
ns += area * np.interp(wav_mod_spectro + wav_mod_spectro*vl/const.c.to(u.km/u.s).value, wav_mod_spectro, flx_mod_spectro)
tarea += area
flx_mod_spectro_broad = ns / tarea
return flx_mod_spectro_broad
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def vsini_fct_accurate_fast_rot_broad(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, ld_picked: float, vsini_picked: float) -> np.ndarray:
"""
Application of a rotation velocity (line broadening) to the interpolated synthetic spectrum using the Carvalho & Johns-Krull (2023) approach
Parameters
----------
wav_mod_spectro : array
Wavelength grid of the model
flx_mod_spectro : array
Flux of the interpolated synthetic spectrum
ld_picked : float
Limb darkening randomly picked by the nested sampling
vsini_picked : float
v.sin(i) randomly picked by the nested sampling (in km.s-1)
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Simon Petrus, Arthur Vigan and Allan Denis
"""
# Correct irregulatities in the wavelength grid
wav_interval = wav_mod_spectro[1:] - wav_mod_spectro[:-1]
wav_to_vsini = np.arange(min(wav_mod_spectro), max(wav_mod_spectro), min(wav_interval) * 2/3)
vsini_interp = interp1d(wav_mod_spectro, flx_mod_spectro, fill_value="extrapolate")
flx_to_vsini = vsini_interp(wav_to_vsini)
# Apply the v.sin(i)
new_flx = vsini_fct_accurate(wav_to_vsini, flx_to_vsini, ld_picked, vsini_picked, nr=10, ntheta=100, dif=0.0)
vsini_interp = interp1d(wav_to_vsini, new_flx, fill_value="extrapolate")
flx_mod_spectro_broad = vsini_interp(wav_mod_spectro)
return flx_mod_spectro_broad
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def bb_cpd_fct(wav: np.ndarray, flx: np.ndarray, distance: np.ndarray, bb_t_picked: np.ndarray, bb_r_picked: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
'''
Function to add the effect of a cpd (circum planetary disc) to the models.
Parameters
----------
wav : array
Wavelength grid of the model
flx : array
Flux of the interpolated synthetic spectrum
distance : array
Distance from the observation in pc units
bb_t_picked : float
Temperature value randomly picked by the nested sampling in K units
bb_r_picked float): Radius randomly picked by the nested sampling in units of planetary radius
Returns
-------
array
New flux of the interpolated synthetic spectrum
Notes
-----
Authors: Paulina Palma-Bifani
'''
if len(flx) > 0:
bb_t_picked *= u.K
bb_r_picked *= u.Rjup
distance *= u.pc
a = 2.0 * const.h * const.c**2
b = const.h * const.c / (wav * u.um * const.k_B * bb_t_picked)
intensity = a / ((wav * u.um)**5 * (np.exp(b) - 1.0))
flx_bb = (np.pi * bb_r_picked**2 / distance**2 * intensity).to(u.W / u.m**2 / u.micron)
# add to model flux of the atmosphere
flx_bb = flx + flx_bb.value
else:
flx_bb = flx
return flx_bb
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def fit_linear_model(components: list[np.ndarray], flx_obs: np.ndarray, err_obs: np.ndarray | None = None, bounds: tuple | None = None, fixed_coeffs: dict[int, float] | None = None) -> dict:
"""
Generic weighted linear model fitter.
Solve:
flx_obs = sum_i c_i * components
Parameters
----------
components : list[np.ndarray]
Model components M_i
flx_obs : np.ndarray
Observation
err_obs : np.ndarray | None
Error
bounds : tuple(float, float) | None
Bounds for the least sqaures
fixed_coeffs : dict(int: float)
Dictionary of fixed coeff {index: value}
Returns
-------
dict with keys:
coeffs
reconstructed
model
Notes
-----
Authors: Allan Denis
"""
n = len(components)
flx_obs = np.asarray(flx_obs)
if err_obs is None:
weights = np.ones_like(flx_obs)
else:
weights = 1.0 / err_obs**2
fixed_coeffs = fixed_coeffs or {}
free_idx = [i for i in range(n) if i not in fixed_coeffs]
# ======================
# Build weighted matrix
# ======================
A = np.column_stack(components)
A_w = A * weights[:, None]
b_w = flx_obs * weights
# ======================
# Remove fixed components
# ======================
if fixed_coeffs:
for i, val in fixed_coeffs.items():
b_w -= A_w[:, i] * val
# ======================
# Solve for free coeffs
# ======================
coeffs = np.zeros(n)
if free_idx:
A_free = A_w[:, free_idx]
if bounds is not None:
low, high = bounds
if np.ndim(low):
low = np.asarray(low)[free_idx]
high = np.asarray(high)[free_idx]
res = optimize.lsq_linear(A_free, b_w, bounds=(low, high))
coeffs_free = res.x
else:
coeffs_free, *_ = np.linalg.lstsq(A_free, b_w, rcond=None)
for i, idx in enumerate(free_idx):
coeffs[idx] = coeffs_free[i]
# ======================
# Insert fixed coeffs
# ======================
for i, val in fixed_coeffs.items():
coeffs[i] = val
# ======================
# Reconstruction
# ======================
reconstructed = np.array([coeffs[i] * components[i] for i in range(n)])
model = np.sum(reconstructed, axis=0)
return {"coeffs": coeffs, "reconstructed": reconstructed, "model": model}
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def build_linear_components(flx_mod: np.ndarray | None = None, transm: np.ndarray | None = None, flx_cont_mod: np.ndarray | None = None, star_flx_obs: np.ndarray | None = None, flx_cont_obs: np.ndarray | None = None, star_flx_cont_obs: np.ndarray | None = None, system_obs: np.ndarray | None = None, analytic: str = "yes", ck_value: float | None = None, bounds: tuple | None = None):
'''
Build linear components, fixed coefficients and bounds
for fit_linear_model().
Parameters
----------
flx_mod : np.ndarray | None
Model flux
transm : np.ndarray | None
Transmission function
flx_cont_mod : np.ndarray | None
Model continuum flux
star_flx_obs : np.ndarray | None
Stellar flux
flx_cont_obs : np.ndarray | None
Observed continuum flux
star_flx_cont_obs : np.ndarray | None
Observation star flux continuum
system_obs : np.ndarray | None
Systematics model
analytic : str
'yes' to fit for the model, 'no' to apply a constant scaling law
ck_value : float | None
Fixed scaling coefficient if scaling_mode is "fixed"
bounds : tuple | None
Bounds for the optimization
Returns
-------
components : list[np.ndarray]
Model components
fixed_coeffs : dict[int, float]
Fixed coefficients
labels : list[str]
Labels for each component
Notes
-----
Authors: Allan Denis
'''
components = []
fixed_coeffs = {}
labels = []
# ======================
# Astrophysical model
# ======================
if flx_mod is not None:
comp = flx_mod
if transm is not None:
comp = transm * comp
if flx_cont_mod is not None and star_flx_obs is not None:
speckles = star_flx_obs / star_flx_cont_obs[:, None]
mid = speckles.shape[1] // 2
comp = comp - flx_cont_mod * speckles[:, mid]
components.append(comp)
labels.append("model")
if analytic == "no":
if ck_value is None:
raise ValueError("ck_value must be provided when analytic='no'")
fixed_coeffs[0] = ck_value
# ======================
# Stellar speckles
# ======================
if star_flx_obs is not None:
speckles = star_flx_obs / star_flx_cont_obs[:, None]
for i in range(speckles.shape[1]):
components.append(speckles[:, i] * flx_cont_obs)
labels.append(f"speckle_{i}")
# ======================
# Systematics
# ======================
if system_obs is not None and system_obs.size > 0:
for i in range(system_obs.shape[1]):
components.append(system_obs[:, i])
labels.append(f"systematic_{i}")
return components, fixed_coeffs, bounds, labels
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def compute_ccf(wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, wav_obs_spectro: np.ndarray, flx_obs_spectro: np.ndarray, err_obs_spectro: np.ndarray, res_mod_spectro: np.ndarray, res_obs_spectro: np.ndarray, res_cont: float, wav_fit: str | np.ndarray, star_flx_obs_spectro: np.ndarray = np.array([]), transm_obs_spectro: np.ndarray = np.array([]), system_obs_spectro: np.ndarray = np.array([]), rv_grid: np.ndarray = np.linspace(-300, 300, 600), rv_sini_map: bool = False, normalize: bool = True):
'''
Function to compute the ccf between a template and data
Parameters
----------
wav_mod_spectro : np.ndarray
Wavelength grid of the template
flx_mod_spectro : np.ndarray
Flux of the template
wav_obs_spectro : np.ndarray
Wavelength grid of the data
flx_obs_spectro : np.ndarray
Flux of the data
err_obs_spectro : np.ndarray
Error of the data
res_mod_spectro : np.ndarray
Resolution of the template
res_obs_spectro : np.ndarray
Resolution of the data
res_cont : float
Resolution of the continuum
wav_fit : str | np.ndarray
Wavelengths used for fitting
star_flx_obs_spectro : np.ndarray
Star flux of the data
transm_obs_spectro : float | np.ndarray
Transmission
system_obs_spectro : np.ndarray
Systematics
rv_grid : np.ndarray
Grid of RV for the CCF function
rv_vsini_map : bool
Whether to use this function to compute a rv / vsini map
normalize : bool
Whether to normalize ccf
Returns
-------
ccf_norm : np.ndarray
CCF function normalized by the SNR
acf_norm : np.ndarray
ACF function rescaled and shifted at the peak of the RV
star_ccf_norm : np.ndarray
CCF function with star data
rv_peak : float
Peak of the RV
Notes
-----
Authors: Arthur Vigan and Allan Denis
'''
if not rv_sini_map and np.max(np.abs(rv_grid)) < 100:
print(f'Your grid is {rv_grid}. \nPlease, choose an RV grid going beyond [-100, 100]')
return None, None, None, None, None
ind_for_fitting = np.array([], dtype=int)
for wav_fit_cut in wav_fit.split('/'):
wmin, wmax = map(float, wav_fit_cut.split(','))
indices = np.where((wav_obs_spectro >= wmin) & (wav_obs_spectro <= wmax))[0]
ind_for_fitting = np.concatenate((ind_for_fitting, indices))
ind_for_fitting = np.sort(ind_for_fitting)
wav_obs_spectro, flx_obs_spectro, err_obs_spectro, res_obs_spectro = wav_obs_spectro[ind_for_fitting], flx_obs_spectro[ind_for_fitting], err_obs_spectro[ind_for_fitting], res_obs_spectro[ind_for_fitting]
# Estimate continuum of observed flux
flx_cont_obs_spectro = continuum_estimate(wav_obs_spectro, flx_obs_spectro, res_obs_spectro, wav_fit_cut, res_cont)
speckles = 1
star_flx_cont_obs_spectro = np.array([])
if star_flx_obs_spectro.size > 0:
star_flx_obs_spectro = star_flx_obs_spectro[ind_for_fitting]
mid_idx = star_flx_obs_spectro.shape[1] // 2
star_flx_mid = star_flx_obs_spectro[:, mid_idx]
star_flx_cont_obs_spectro = continuum_estimate(wav_obs_spectro, star_flx_mid, res_obs_spectro, wav_fit, res_cont)
speckles = star_flx_obs_spectro / star_flx_cont_obs_spectro[:, None]
if len(transm_obs_spectro) > 0:
transm_obs_spectro = transm_obs_spectro[ind_for_fitting]
else:
transm_obs_spectro = 1
if len(system_obs_spectro) > 0:
system_obs_spectro = system_obs_spectro[ind_for_fitting]
# Preprocess model template at RV = 0
flx_mod_spectro_no_rv = resolution_decreasing(wav_mod_spectro, flx_mod_spectro, res_mod_spectro, wav_obs_spectro, res_obs_spectro)
flx_cont_mod_spectro_no_rv = continuum_estimate(wav_obs_spectro, flx_mod_spectro_no_rv * transm_obs_spectro, res_obs_spectro, wav_fit, res_cont)
mid_speckles = speckles[:, speckles.shape[1] // 2] if isinstance(speckles, np.ndarray) else speckles
flx_mod_spectro_no_rv_hf = transm_obs_spectro * flx_mod_spectro_no_rv - flx_cont_mod_spectro_no_rv * mid_speckles
if star_flx_obs_spectro.size > 0:
flx_obs_spectro_hf = flx_obs_spectro - flx_cont_obs_spectro * mid_speckles
star_flx_obs_spectro_hf = star_flx_obs_spectro[:, mid_idx] - star_flx_cont_obs_spectro
else:
flx_obs_spectro_hf = flx_obs_spectro - flx_cont_obs_spectro
star_flx_obs_spectro_hf = np.array([])
# Compute CCF (parallel)
try:
with ThreadPool(processes=mp.cpu_count()) as pool:
pbar = tqdm(total=len(rv_grid), leave=False)
results = []
def update(*_):
pbar.update()
for irv in rv_grid:
task = pool.apply_async(compute_ccf_single_rv, args=(irv, wav_mod_spectro, flx_mod_spectro, flx_mod_spectro_no_rv_hf, res_mod_spectro, wav_obs_spectro, flx_obs_spectro, flx_cont_obs_spectro, flx_obs_spectro_hf, res_obs_spectro, wav_fit, res_cont, err_obs_spectro, transm_obs_spectro, star_flx_obs_spectro, star_flx_cont_obs_spectro, star_flx_obs_spectro_hf, system_obs_spectro, speckles, 0.6), callback=update)
results.append(task)
pool.close()
pool.join()
ccf, acf, ccf_star, logL = map(np.zeros, [len(rv_grid)] * 4)
for i, task in enumerate(results):
ccf[i], acf[i], ccf_star[i], logL[i] = task.get()
except Exception as e:
print(f'Parallel computation of CCF produced the following error: {e}. Trying serial computation.')
ccf, acf, ccf_star, logL = [], [], [], []
for irv in tqdm(rv_grid):
res = compute_ccf_single_rv(irv, wav_mod_spectro, flx_mod_spectro, flx_mod_spectro_no_rv_hf, res_mod_spectro, wav_obs_spectro, flx_obs_spectro, flx_cont_obs_spectro, flx_obs_spectro_hf, res_obs_spectro, wav_fit, res_cont, err_obs_spectro, transm_obs_spectro, star_flx_obs_spectro, star_flx_cont_obs_spectro, star_flx_obs_spectro_hf, system_obs_spectro, speckles, 0.6)
ccf.append(res[0])
acf.append(res[1])
ccf_star.append(res[2])
logL.append(res[3])
ccf, acf, ccf_star, logL = map(np.array, [ccf, acf, ccf_star, logL])
if not(rv_sini_map):
# Normalize CCF
mask_far = np.abs(rv_grid) > 100
mask_valid = np.abs(rv_grid) <= 150
imax = np.argmax(ccf)
rv_peak = rv_grid[imax]
if normalize:
for arr in [ccf, acf, ccf_star]:
arr -= np.mean(arr[mask_far])
# SNR normalization
acf_scale = np.max(ccf[mask_valid]) / np.max(acf)
sigma = np.sqrt(np.abs(np.nanvar(ccf[np.abs(rv_grid - rv_peak) > 100]) - np.nanvar(acf[mask_far] * acf_scale)))
max_peak = ccf[imax] / sigma
ccf = ccf / sigma
ccf_star = ccf_star / np.std(ccf_star)
acf = acf * (max_peak / np.max(acf))
print(f'Maximum peak for rv = {rv_peak:.1f} km/s (SNR = {max_peak:.1f})')
return ccf, acf, ccf_star, rv_peak, logL, ccf
return logL
# ----------------------------------------------------------------------------------------------------------------------
[docs]
def compute_ccf_single_rv(rv: float, wav_mod_spectro: np.ndarray, flx_mod_spectro: np.ndarray, flx_mod_spectro_no_rv_hf: np.ndarray, res_mod_spectro: np.ndarray, wav_obs_spectro: np.ndarray, flx_obs_spectro: np.ndarray, flx_cont_obs_spectro: np.ndarray, flx_obs_spectro_hf: np.ndarray, res_obs_spectro: np.ndarray, wav_cont: str, res_cont: np.ndarray, err_obs_spectro: np.ndarray, transm_obs_spectro: int | np.ndarray = 1, star_flx_obs_spectro: np.ndarray = np.array([]), star_flx_cont_obs_spectro: (int | np.ndarray) = 1, star_flx_obs_spectro_hf: np.ndarray = np.array([]), system_obs_spectro: np.ndarray = np.array([]), speckles: (int | np.ndarray) = 1, ld: float = 0.6) -> tuple[float, float, float]:
'''
Function to compute the correlation between template and data for a specific rv value
Parameters
----------
rv : float
rv value
vsini : float
vsini value
wav_mod_spectro : np.ndarray
Wavelength grid of the template
flx_mod_spectro : np.ndarray
Flux of the template
flx_mod_spectro_no_rv_hf : np.ndarray
High frequency content of the flux of the model at 0 rv
res_mod_spectro : np.ndarray
Resolution of the template interpolated onto the wavelength grid of the data
wav_obs_spectro : np.ndarray
Wavelength grid of the data
flx_obs_spectro : np.ndarray
The flux of the data
flx_cont_obs_spectro : np.ndarray
Continuum of the flux of the data
res_obs_spectro : np.ndarray
Resolution of the data
wav_cont : str
Wavelength used for the continuum
res_cont : float
Resolution of the continuum
err_obs_spectro : np.ndarray
Error of the flux
transm_obs_spectro : int | np.ndarray
Transmission
star_flx_obs_spectro : np.ndarray
Flux of the star
star_flx_cont_obs_spectro : int | np.ndarray
Continuum of the flux of the star
system_obs_spectro : np.ndarray
Systematics
speckles : int | np.ndarray
Speckles
Returns
-------
ccf : float
Correlation between the template and the data
acf : float
Autocorrelation between the template and itself
ccf_star : float
Correlation between the template and the star data
Notes
-----
Authors: Arthur Vigan and Allan Denis
'''
# Doppler shift + resolution match
wav_shifted, flx_shifted, res_shifted = doppler_fct(wav_mod_spectro, flx_mod_spectro, res_mod_spectro, rv)
flx_shifted = resolution_decreasing(wav_shifted, flx_shifted, res_shifted, wav_obs_spectro, res_obs_spectro)
# Continuum estimation
flx_cont = continuum_estimate(wav_obs_spectro, flx_shifted * transm_obs_spectro, res_obs_spectro, wav_cont, res_cont)
mid_idx = speckles.shape[1] // 2 if isinstance(speckles, np.ndarray) else 0
speckles_mid = speckles[:, mid_idx] if isinstance(speckles, np.ndarray) else speckles
flx_rv_hf = transm_obs_spectro * flx_shifted - flx_cont * speckles_mid
# Estimate model signal
components, fixed_coeffs, bounds, labels = build_linear_components(
flx_shifted,
transm_obs_spectro,
flx_cont,
star_flx_obs_spectro,
flx_cont_obs_spectro,
star_flx_cont_obs_spectro,
system_obs_spectro,
analytic='yes'
)
result = fit_linear_model(
components=components,
flx_obs=flx_obs_spectro,
err_obs=err_obs_spectro,
bounds=bounds,
fixed_coeffs=fixed_coeffs
)
ccf, best_model_hf = result['coeffs'][0], result['reconstructed'][0]
# ACF
acf = np.sum(flx_rv_hf * flx_mod_spectro_no_rv_hf) / (np.sqrt(np.sum(flx_rv_hf ** 2)) * np.sqrt(np.sum(flx_mod_spectro_no_rv_hf ** 2)))
# CCF with star if available
if star_flx_obs_spectro_hf.size > 0:
ccf_star = np.sum(flx_rv_hf * star_flx_obs_spectro_hf) / (np.sqrt(np.sum(flx_rv_hf ** 2)) * np.sqrt(np.sum(star_flx_obs_spectro_hf ** 2)))
else:
ccf_star = 1
# log-likelihood estimation
residuals = flx_obs_spectro_hf - best_model_hf
ki2 = np.sum((residuals / err_obs_spectro) ** 2)
logL = -0.5 * ki2 - 0.5 * np.nansum(np.log(2 * np.pi * err_obs_spectro ** 2))
return ccf, acf, ccf_star, logL