import os
import time
import json
import logging
import numpy as np
from pathlib import Path
from ForMoSA.core.errors import ForMoSAError
from ForMoSA.grid.subgrid_set import SubGridSet
from ForMoSA.core.loggings import setup_logging
from ForMoSA.config.global_config import Config_NS
from ForMoSA.nested_sampling.results import NSResults
from ForMoSA.parameter.parameter_set import ParameterSet
from ForMoSA.observation.observation_set import ObservationSet
from ForMoSA.core.enums import NestedAlgorithm, LogLikelihoodType
from ForMoSA.transform.observed import ObservedParameters, ObservedModel
# optional imports for nested sampling algorithms
try:
import nestle
except ImportError:
nestle = None
try:
import pymultinest
except ImportError:
pymultinest = None
try:
import ultranest
except ImportError:
ultranest = None
[docs]
class NestedSampling(object):
'''
ForMoSA Nested_Sampling class, which provides easy access to the parameters of the nested sampling algorithm
Parameters
----------
algorithm : NestedAlgorithm
Algorithm used for the nested sampling ('nestle', 'ultranest' or 'pymultinest')
npoints : int
Number of living points used for the nested sampling
logL_type : list[LogLikelihoodType]
List of loglikelihood types used in the nested sampling
config_NS : Config_NS
Instance of class Config_NS containing parameters of ns algorithms for each of the algorithm
observations : ObservationSet
Instance of ObservationSet
subgrids : SubGridSet
Instance of SubGridSet
parameters : ParameterSet
Instance of ParameterSet
wave_fit : list[str] | None
List of wavelength grid used for fitting
interp_method : str
Interpolation method ('linear', 'cubic', 'spline', ...)
bounds_lsq : list[tuple[float, float]] | None
List of bounds used for the least squares (used for high contrast)
logger : logging.Logger
Logger
log_level : str
Level of the Logger
Notes
-----
Authors: Allan Denis
'''
def __init__(self, algorithm: NestedAlgorithm, npoints: int, logL_type: list[LogLikelihoodType], config_NS: Config_NS, observations: ObservationSet, subgrids: SubGridSet, parameters: ParameterSet, wave_fit: list[str] | None = None, interp_method: str = 'linear', bounds_lsq: list[tuple[float, float]] | None = None, logger: logging.Logger | None=None, log_level: str='INFO'):
self._logger = logger if logger is not None else setup_logging(log_level, name='NestedSampling')
self._logL_type = logL_type
self._algorithm = algorithm.algo
self._npoints = npoints
self._parameters = parameters
self._observations = observations
self._subgrids = subgrids
self._config_NS = config_NS
self._interp_method = interp_method
self._wave_fit = wave_fit
self._bounds_lsq = bounds_lsq
self._restricted_subgrids = SubGridSet(self._subgrids.parent_grid, logger=self._logger)
self._restricted_observations = ObservationSet(logger=self._logger)
self._results = None
self._validate()
self._restricted_models_and_data()
# =================
# Representation
# =================
def __repr__(self):
return f'NestedSampling(algorithm = {self.algorithm}, npoints = {self.npoints}, status = {"fitted" if not self.results is None else "Not fitted"})'
# =================
# Properties
# =================
@property
def algorithm(self) -> str:
"""Algorithm."""
return self._algorithm
@property
def logL_type(self) -> list:
"""logL functions."""
return self._logL_type
@property
def observations(self) -> ObservationSet:
"""Set of observations."""
return self._observations
@property
def subgrids(self) -> SubGridSet:
"""Set of subgrids."""
return self._subgrids
@property
def parameters(self) -> ParameterSet:
"""Priors parameters."""
return self._parameters
@property
def config_NS(self) -> Config_NS:
"""Config_NS."""
return self._config_NS
@property
def interp_method(self) -> str:
"""Interpolation method."""
return self._interp_method
@property
def wave_fit(self) -> str:
"""Wavelengths for fitting."""
return self._wave_fit
@property
def bounds_lsq(self) -> list[float]:
"""Bounds for the Least Squares estimation."""
return self._bounds_lsq
@property
def ns_params(self) -> dict:
"""Dictionary of NS algo."""
return getattr(self.config_NS, self.algorithm.lower()).to_dict
@property
def npoints(self) -> int:
"""Nested sampling number of living points."""
return self._npoints
@property
def restricted_observations(self) -> ObservationSet:
"""Set of restricted observations according to wave_fit."""
return self._restricted_observations
@property
def restricted_subgrids(self) -> SubGridSet:
"""Set of restricted subgrids according to wave_fit."""
return self._restricted_subgrids
@property
def results(self) -> "NSResults":
"""Results."""
return self._results
@property
def logger(self) -> logging.Logger:
"""Logger."""
return self._logger
@property
def to_dict(self) -> dict:
"""Dictionary representation of NestedSampling."""
return {
'algorithm': self.algorithm,
'npoints': self.npoints,
'logLtype': [logL.loglike for logL in self.logL_type],
'config_NS': self.config_NS.to_dict,
'parameters': self.parameters.to_dict,
'wave_fit': self.wave_fit,
'interp_method': self.interp_method,
'bounds_lsq': [(str(lower), str(upper)) for lower, upper in self.bounds_lsq]
}
# =================
# Class method
# =================
[docs]
@classmethod
def from_dict(cls, data: dict, observations: ObservationSet, subgrids: SubGridSet, logger: logging.Logger | None=None, log_level: str='INFO') -> 'NestedSampling':
'''
Reconstruct an instance of NestedSampling from a dictionary of NestedSampling.
Parameters
----------
data : dict
Dictionary representation NestedSampling parameters
observations : ObservationSet
Instance of ObservationSet
subgrids : SubGridSet
Instance of SubGridSet
logger : logging.Logger
Logger
log_level : str
Level of the Logger
Returns
-------
'NestedSampling'
An instance of class NestedSampling
Notes
-----
Authors: Allan Denis
'''
logger = logger if logger is not None else setup_logging(level=log_level, name='NestedSampling')
logger.debug('Building an instance of NestedSampling from dictionary')
if not isinstance(data, dict):
raise ForMoSAError(f'Wrong type for data: {type(data)}. Expected a dictionary', logger)
algorithm, npoints, logL_type, config_NS, parameters, wave_fit, interp_method, bounds_lsq = data['algorithm'], data['npoints'], data['logLtype'], data['config_NS'], data['parameters'], data['wave_fit'], data['interp_method'], data['bounds_lsq']
logger.info(f' Found NestedSampling with {algorithm} algorithm, {npoints} living points and {len(parameters)} parameters')
algorithm = NestedAlgorithm[algorithm.upper()]
logL_type = [LogLikelihoodType[logL_type.upper()] for logL_type in data['logLtype']]
config_NS = Config_NS.from_dict(config_NS)
parameters = ParameterSet.from_dict(parameters, logger=logger)
bounds_lsq = [(float(lower), float(upper)) for lower, upper in bounds_lsq]
return cls(
algorithm=algorithm,
npoints=npoints,
logL_type=logL_type,
config_NS=config_NS,
parameters=parameters,
wave_fit=wave_fit,
interp_method=interp_method,
bounds_lsq=bounds_lsq,
observations=observations,
subgrids=subgrids,
logger=logger)
[docs]
@classmethod
def from_json(cls, path: str | os.PathLike, observations: ObservationSet, subgrids: SubGridSet, logger: logging.Logger | None=None, log_level: str='INFO') -> 'NestedSampling':
'''
Reconstruct an instance of NestedSampling from the folder containing the NestedSampling parameters.
Parameters
----------
path : str | os.PathLike
Dictionary representation NestedSampling parameters
observations : ObservationSet
Instance of ObservationSet
subgrids : SubGridSet
Instance of SubGridSet
logger : logging.Logger
Logger
log_level : str
Level of the Logger
Returns
-------
'NestedSampling'
An instance of class NestedSampling
Notes
-----
Authors: Allan Denis
'''
logger = logger if logger is not None else setup_logging(level=log_level, name='NestedSampling')
logger.debug(f'Building instance of NestedSampling from the path {path}')
# Initial checking
if not isinstance(path, (str, os.PathLike)):
raise ForMoSAError(f' Wrong type for path: {type(path)}. Expected a str or os.PathLike', logger)
NS_params_file = Path(path) / 'NS_params' / 'NS_params.json'
if not NS_params_file.exists():
raise ForMoSAError(f'{NS_params_file} does not exist', logger)
with open(NS_params_file, "r") as f:
data = json.load(f)
ns = cls.from_dict(data, observations = observations, subgrids = subgrids, logger = logger)
results_file = Path(path) / 'NS_results' / f'results_{ns.algorithm}.json'
if not results_file.exists():
ns.logger.warning(f'{results_file} does not exist. Cannot load the results of the Nested Sampling', ns.logger)
else:
ns.load_results(path)
return ns
# =================
# Method
# =================
def _validate(self) -> None:
'''
Validation for NestedSampling.
Notes
-----
Authors: Allan Denis
'''
for name, instance in zip(['logL_type', 'observations', 'subgrids', 'parameters', 'config_NS', 'wave_fit', 'interp_method', 'bounds_lsq'], [list, ObservationSet, SubGridSet, ParameterSet, Config_NS, list, str, list]):
if not isinstance(getattr(self, name), instance):
raise ForMoSAError(f'Wrong type for {name}: {type(getattr(self, name))}. Expected {instance}', self.logger)
for name, value in zip(['logL_type', 'wave_fit', 'subgrids'], [self.logL_type, self.wave_fit, self.subgrids.subgrids]):
if len(value) != self.observations.n_observations:
raise ForMoSAError(f'Invalid length for {name}. Expected {self.observations.n_observations}', self.logger)
for bound in self.bounds_lsq:
if not isinstance(bound, tuple):
raise ForMoSAError(f'bounds_lsq must only contain tuples. Found a {type(bound)}', self.logger)
if len(bound) != 2:
raise ForMoSAError(f'bounds_lsq must only contain tuples of length 2. Found a tuple of length {len(bound)}', self.logger)
for b in bound:
if not isinstance(b, float):
raise ForMoSAError(f'Wrong type detected in {bound}: {type(b)}. Expected a float', self.logger)
valid_algorithms = [algo.value for algo in NestedAlgorithm]
if self.algorithm not in valid_algorithms:
raise ForMoSAError(f'{self.algorithm} is not supported. Choose from {valid_algorithms}', self.logger)
if self.algorithm == NestedAlgorithm.NESTLE and nestle is None:
raise ForMoSAError('nestle is not installed', self.logger)
if self.algorithm == NestedAlgorithm.ULTRANEST and ultranest is None:
raise ForMoSAError('ultranest is not installed', self.logger)
if self.algorithm == NestedAlgorithm.PYMULTINEST and pymultinest is None:
raise ForMoSAError('pymultinest is not installed', self.logger)
valid_logL_type = [logL for logL in LogLikelihoodType]
for i, ll in enumerate(self.logL_type):
if ll not in valid_logL_type:
raise ForMoSAError(f'Invalid logL_type ({ll}). Choose from {valid_logL_type}', self.logger)
def _restricted_models_and_data(self) -> None:
'''
Create restricted versions of subgrids and observations according to wave_fit.
These restricted instances are stored internally and reused during the nested sampling.
Notes
-----
Authors: Allan Denis
'''
self.logger.info(f' Restrict subgris and observations to windows {self.wave_fit}')
for index in range(self.observations.n_observations):
# Restrict grid and observation to wave_fit
observation = self.observations.observations[index]._restricted_observation(self.wave_fit[index], print_logger=False)
if len(self.subgrids.subgrids[index].wave) != len(self.observations.observations[index].wave):
extension = 0.05
else:
extension = 0.0
subgrid = self.subgrids.subgrids[index]._restricted_grid(self.wave_fit[index], print_logger=False, extension=extension)
# Add subgrid and observation to restricted sets
self._restricted_subgrids.add_subgrid(subgrid)
self._restricted_observations.add_observation(observation)
[docs]
def run(self, results_path: str | os.PathLike) -> None:
'''
Method to run the nested sampling algorithm using the model, observation and nested sampling parameters.
Parameters
----------
results_path : str | os.PathLike
Path of the output
Notes
-----
Authors: Simon Petrus, Matthieu Ravet and Allan Denis
'''
if not isinstance(results_path, (str, os.PathLike)):
raise ForMoSAError(f'Wrong type for results_path: {type(results_path)}. Expected a string or os.PathLike', self.logger)
n_free_parameters = self.parameters.n_free_parameters
loglike_gp = lambda theta: self.loglike(theta)
prior_transform_gp = lambda theta: self.prior_transform(theta)
time_before_ns = time.time()
self.logger.debug('Running Nested Sampling')
self.logger.info(f' Algorithm for the Nested Sampling: {self.algorithm}')
path = Path(results_path)
self.logger.info(f' Creating directory {path}')
path.mkdir(exist_ok=True, parents=True)
self.logger.info(f' Summary for the Nested Sampling parameters: \n {self.parameters.summary(as_dataframe=True)}')
# Algorithms
if self.algorithm == NestedAlgorithm.NESTLE.algo:
if nestle is None:
msg = 'Nestle is not installed. Please install it to use the nestle algorithm.'
self._logger.error(msg)
raise ForMoSAError(msg)
res = nestle.sample(loglike_gp, prior_transform_gp, n_free_parameters,
npoints=self.npoints,
**self.ns_params,
callback=nestle.print_progress)
self._results = NSResults.from_nestle(res, self.parameters.free_titles)
elif self.algorithm == NestedAlgorithm.PYMULTINEST.algo:
if pymultinest is None:
msg = 'Pymultinest is not installed. Please install it to use the MultiNest algorithm.'
self._logger.error(msg)
raise ForMoSAError(msg)
output_path = f"{results_path}/pymultinest"
if not Path(output_path).exists():
Path(output_path).mkdir(exist_ok=True, parents=True)
# MultiNest v3.10 can fail with long output basenames; run from the
# target folder and use a short basename to avoid file-open conflicts.
pymultinest_dir = os.path.join(str(results_path), 'pymultinest')
prev_cwd = os.getcwd()
try:
os.chdir(pymultinest_dir)
res = pymultinest.solve(LogLikelihood=loglike_gp,
Prior=prior_transform_gp,
n_dims=n_free_parameters,
n_live_points=self.npoints,
outputfiles_basename='RAW_',
**self.ns_params)
finally:
os.chdir(prev_cwd)
self._results = NSResults.from_pymultinest(output_path, self.parameters.free_titles)
elif self.algorithm == NestedAlgorithm.ULTRANEST.algo:
if ultranest is None:
msg = 'Ultranest is not installed. Please install it to use the UltraNest algorithm.'
self._logger.error(msg)
raise ForMoSAError(msg)
sampler = ultranest.ReactiveNestedSampler(param_names=[p.title for p in self.parameters.free_parameters],
loglike=loglike_gp,
transform=prior_transform_gp,
log_dir=str(results_path) + '/ultranest/',
**self.ns_params['ReactiveNS'])
res = sampler.run(min_num_live_points=self.npoints, **self.ns_params['runNS'])
self._results = NSResults.from_ultranest(res, self.parameters.free_titles)
else:
raise ForMoSAError(f'Unknown algorithm: {self.algorithm}. Chose amongst {[algo.value for algo in NestedAlgorithm]}')
self._logger.info(f' Time spent: {time.time() - time_before_ns}')
self.results.summary()
[docs]
def loglike(self, theta: np.ndarray[float]) -> float:
'''
Compute the loglikelihood for given parameters of the nested sampling.
Parameters
----------
theta : np.ndarray[float]
Values randomly drawn by the Nested Sampling
Returns
-------
logL : float
Final loglikelihood
'''
observed_models = self.build_models_from_theta(theta)
return self._compute_loglikelihood(observed_models)
[docs]
def build_models_from_theta(self, theta: np.ndarray[float]) -> list[ObservedModel]:
'''
Build the models from the values of the free parameters drawn by the Nested Sampling
Parameters
----------
theta : np.ndarray[float]
List of values picked by the Nested Sampling for the free parameters
Returns
-------
list[ObservedModel]
List of instances of class ObservedModel
Notes
-----
Authors: Allan Denis
'''
observed_models = []
for index in range(self.observations.n_observations):
# Get parameters associated to the observation
observed_params = self._build_params_for_obs(theta, index)
# Restrict grid and observation to wave_fit
subgrid = self.restricted_subgrids.subgrids[index]
observation = self.restricted_observations.observations[index]
observed_models.append(subgrid._build_model_from_params(observed_params, observation, interp_method = self.interp_method, bounds_lsq = self.bounds_lsq[index]))
return observed_models
def _compute_loglikelihood(self, observed_models: list[ObservedModel]) -> float:
'''
Compute loglikelihood.
Parameters
----------
observed_models : list[ObservedModel]
List of instances of class ObservedModel
Returns
-------
float
loglikelihood value
Notes
-----
Authors: Allan Denis
'''
logL = 0
for index in range(self.observations.n_observations):
subgrid = self.subgrids.subgrids[index]._restricted_grid(self.wave_fit[index], print_logger=False)
observation = self.observations.observations[index]._restricted_observation(self.wave_fit[index], print_logger=False)
# Compute model from the parameters
logL += subgrid._compute_loglike_for_obs(observed_models[index], observation, self.logL_type[index], interp_method = self.interp_method, bounds_lsq = self.bounds_lsq[index])
return logL
def _build_params_for_obs(self, free_values: np.ndarray[float], obs_index: int) -> ObservedParameters:
'''
Build a list of free + fixed parameters relevant to a given observation.
Parameters
----------
free_values : np.ndarray[float]
List of transformed values for the free parameters
obs_index : int
Index of the observation from which we want the parameter values
Return
------
ObservedParameters
Intance of class ObservedParameters containing dictionary of parameters values (free + fixed) associated to the observation index
Notes
-----
Authors: Allan Denis
'''
if len(free_values) != self.parameters.n_free_parameters:
raise ForMoSAError("Invalid free_values length", self.logger)
params = {}
# Convert numpy array to list of Python floats to avoid numpy.float64 scalars
free_values_list = [float(v) for v in free_values]
free_iter = iter(free_values_list)
subgrid = self.restricted_subgrids.subgrids[obs_index]
for p in self.parameters.parameters:
# Check if this parameter applies to the observation type
if p.kind not in subgrid.relevant_parameter_kinds: # We ignore this parameter if it does not apply to the observation
continue
# Global parameters are always included
if not p.is_local:
params[p] = p.prior.value if p.is_fixed else next(free_iter)
# Local parameters are included only if obs_index matches
elif p.is_local and obs_index in p.obs_index:
params[p] = p.prior.value if p.is_fixed else next(free_iter)
return ObservedParameters(params)
[docs]
def save_results(self, results_path: str | os.PathLike, **kwargs) -> None:
'''
Method to save the results to the path results_path
Parameters
----------
results_path : str | os.PathLike
Path to save the results to
**kwargs = Additional parameters (wav_fit, interp_method, bounds_lsq)
Notes
-----
Authors: Allan Denis
'''
self._logger.info(' Saving results')
results_file = Path(results_path) / 'NS_results' / f'results_{self.algorithm}.json'
NS_params_file = Path(results_path) / 'NS_params' / 'NS_params.json'
# NS results
self.logger.debug(f'Saving results to path {results_file}>')
results_file.parent.mkdir(exist_ok=True, parents=True)
with open(results_file, 'w') as f:
json.dump(self.results.to_dict, f, indent=4)
# NS params
self.logger.debug(f'Saving NS paramrs to path {NS_params_file}')
NS_params_file.parent.mkdir(exist_ok=True, parents=True)
with open(str(NS_params_file), 'w') as f:
json.dump(self.to_dict, f, indent=4)
[docs]
def load_results(self, results_path: str | os.PathLike) -> None:
'''
Method to load the results from the path results_paths
Parameters
----------
results_path : str | os.PathLike
Path to save the results to
Notes
-----
Authors: Allan Denis
'''
results_path = Path(results_path) / 'NS_results'
self.logger.debug(f'Loading results from path {results_path}')
results_file = results_path / f'results_{self.algorithm}.json'
if not(results_file.exists()):
raise ForMoSAError(f'<{results_file} does not exist. Please make sure to use an existing result file>', self.logger)
self._logger.debug(f'< load {results_file}')
with open(results_file, 'r') as f:
results = json.load(f)
self._results = NSResults.from_dict(results)