Source code for ForMoSA.filter.filter

import os
import logging
import astropy
import numpy as np
import astropy.units as u
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from astroquery.svo_fps import SvoFps
from matplotlib.axes._axes import Axes

from pathlib import Path
from astropy.io import fits
from ForMoSA.core.errors import ForMoSAError
from ForMoSA.core.enums import WavelengthUnit
from ForMoSA.core.loggings import setup_logging
from astropy.table import Table, MaskedColumn, Column
from ForMoSA.core.config import  MainPlotConfig, MAIN_PLOT, PhotometricPlotConfig
import ForMoSA.core.config as config

[docs] class PhotometryFilter(object): ''' PhotometryFilter class Defining a filter. The faclity, instrument and filter_id has to be under the right format (see https://svo2.cab.inta-csic.es/theory/fps/ for more details). Parameters ---------- facility : str Name of the facility ('Paranal', 'Keck', 'JWST', ...) instrument : str Name of the instrument ('SPHERE', 'NIRC2', 'NIRCam', ...) filter_id : str ID of the filter ('IRDIS_B_H', 'Lp', F410M) filter_path : str | os.PathLike Path where to save the filter data log_level : str Level of the logger logger : logging.Logger Logger display_unit : WavelengthUnit Unit of the wavelength to display Notes ----- Authors: Allan Denis ''' def __init__(self, facility: str, instrument: str, filter_id: str, filter_path: str | os.PathLike | None = None, log_level: str = 'info', logger: logging.Logger | None = None, display_unit: WavelengthUnit = WavelengthUnit.MICROMETER): if logger == None: self._logger = setup_logging(level=log_level, name=__name__) else: self._logger = logger self._facility = facility self._instrument = instrument self._filter_id = filter_id self._filter_path = filter_path if filter_path is not None else config.FILTER_PATH self._data = [] self._medata = [] self._native_unit = WavelengthUnit.ANGSTROM self._display_unit = display_unit self._validate() self._svo_filter_trans() # ================================================ # Representation # ================================================ def __repr__(self) -> str: return f'<PhotometryFilter, name={self.name}, central wavelength = {self.central_wavelength}, range = [{self.wavelength_min} - {self.wavelength_max}] {self.unit}>' def __format__(self) -> str: return self.__repr__() def __getstate__(self): state = self.__dict__.copy() level_int = self._logger.level state['__pickle_log_level'] = 'OFF' if level_int >= 100 else logging.getLevelName(level_int) state['__pickle_log_name'] = self._logger.name.removeprefix('ForMoSA.') state['_logger'] = None return state def __setstate__(self, state): log_level = state.pop('__pickle_log_level', 'INFO') log_name = state.pop('__pickle_log_name', __name__) self.__dict__.update(state) self._logger = setup_logging(level=log_level, name=log_name) # =============================================== # Properties # =============================================== # ------------------ # General properties # ------------------ @property def logger(self) -> logging.Logger: """Logger.""" return self._logger @property def facility(self) -> str: """Facility for the filter (e.g. 'Paranal', 'Keck', 'JWST', ...).""" return self._facility @property def instrument(self) -> str: """Instrument used of the filter (e.g. 'SPHERE', 'NIRC2', 'NIRCam', ...).""" return self._instrument @property def filter_id(self) -> str: """ID of the filter (e.g. 'IRDIS_B_H', 'Lp', 'F410M', ...).""" return self._filter_id @property def filter_path(self) -> Path: return Path(self._filter_path) @filter_path.setter def filter_path(self, new_path: str | os.PathLike) -> None: self._filter_path = new_path @property def name(self) -> str: """Full name of the filter (e.g. 'Paranal/SPHERE.IRDIS_B_H', 'Keck/NIRC2.Lp', 'JWST/NIRCam.F410M').""" return self.facility + '/' + self.instrument + '.' + self.filter_id @property def data(self) -> astropy.table.table.Table: """Table containing the data of the filter (wavelength, transmission).""" return self._data @property def unit(self) -> u.core.PrefixUnit: """Unit to use for the wavelength.""" return self._display_unit.unit @property def native_unit(self) -> u.core.PrefixUnit: """Native unit of the wavelength (Angstrom).""" return self._native_unit.unit @native_unit.setter def native_unit(self, new_unit: WavelengthUnit) -> None: """Setter for native unit of the wavelength.""" self._native_unit = new_unit self._validate() @property def metadata(self) -> astropy.table.table.Table: """Table containing the metadata of the filter.""" return self._metadata @property def wavelength(self) -> u.quantity.Quantity: """Wavelength grid of the filter.""" return ((self.data['Wavelength'].data.filled(np.nan) * self.native_unit).to(self.unit)).value @property def transmission(self) -> np.ndarray: """Transmission of the filter.""" return self.data['Transmission'].data.filled(np.nan) @property def fwhm(self) -> u.quantity.Quantity: """FWHM.""" return ((self.metadata['FWHM'][0] * self.native_unit).to(self.unit)).value @property def zero_point(self) -> np.float64: """Zero point.""" return self.metadata['ZeroPoint'][0] @property def Mag0(self) -> np.float64: """Magnitude 0.""" return self.metadata['Mag0'][0] @property def zero_point_type(self) -> str: """Zero point type ('Pogson', 'Asinh', 'Linear', see https://www.ivoa.net/documents/Notes/SVOFPS/NOTE-SVOFPS-1.0.20121015.pdf).""" return self.metadata['ZeroPointType'][0] @property def softening_parameter(self) -> str: """Softening parameter (see https://www.ivoa.net/documents/Notes/SVOFPS/NOTE-SVOFPS-1.0.20121015.pdf).""" return self.metadata['AsinhSoft'][0] @property def folder(self) -> Path: """Folder of the filter.""" folder = self.filter_path / self.facility / self.instrument if not folder.exists(): self._logger.info(f" {folder} does not exist. Creating it") folder.mkdir(parents=True, exist_ok=True) return folder @property def file_path(self) -> Path: """Path of the filter.""" return self.folder / f"{self.filter_id}.fits" # --------------------------------------------------------------------------------------------- # Properties related to wavelength # see https://www.ivoa.net/documents/Notes/SVOFPS/NOTE-SVOFPS-1.0.20121015.pdf for more details # --------------------------------------------------------------------------------------------- @property def mean_wavelength(self) -> float: r"""Mean integrated wavelength ($\lambda_{mean} = \frac{int_{\lambda} \lambda T(\lambda) d\lambda}{T(\lambda) d\lambda}$).""" return ((self.metadata['WavelengthMean'][0] * self.native_unit).to(self.unit)).value @property def central_wavelength(self) -> float: """Central wavelength between the 2 wavelengths used to compute the FWHM.""" return ((self.metadata['WavelengthCen'][0] * self.native_unit).to(self.unit)).value @property def wavelength_eff(self) -> float: r"""Mean integrated wavelength with Vega spectrum ($\lambda_{ref} = \frac{\int_{\lambda} \lambda T(\lambda) Vega(\lambda) d\lambda}{T(\lambda) Vega(\lambda) d\lambda}$).""" return ((self.metadata['WavelengthEff'][0] * self.native_unit).to(self.unit).value) @property def peak_wavelength(self) -> float: """Wavelength corresponding to the maximum of transmission.""" return ((self.metadata['WavelengthPeak'][0] * self.native_unit).to(self.unit)).value @property def pivot_wavelength(self) -> float: r"""Wavelength computed as \sqrt{\frac{\lambda T(\lambda) d\lambda}{T(\lambda) d\lambda / \lambda}}.""" return ((self.metadata['WavelengthPivot'][0] * self.native_unit).to(self.unit)).value @property def photon_wavelength(self) -> float: r"""Photon distribution based effective wavelength ($\lambda_{phot} = \frac{\int_{\lambda} \lambda^2 T(\lambda) Vega(\lambda) d\lambda}{\lambda T(\lambda) Vega(\lambda) d\lambda}$).""" return ((self.metadata['WavelengthPhot'][0] * self.native_unit).to(self.unit)).value @property def wavelength_min(self) -> float: """Minimum wavelength with transmission > 1% of maximum transmission.""" return ((self.metadata['WavelengthMin'][0] * self.native_unit).to(self.unit)).value @property def wavelength_max(self) -> float: """Maximum wavelength with transmission > 1% of maximum transmission.""" return ((self.metadata['WavelengthMax'][0] * self.native_unit).to(self.unit)).value @property def wavelength_range(self) -> tuple[float, float]: """Wavelength range.""" return self.wavelength_min, self.wavelength_max @property def effective_width(self) -> float: r"""Equivalent to the width of a rectangle with height equal to maximum transmission and with the same area that the one covered by the filter transmission curve ($Width_{eff} = \frac{T(\lambda) d\lambda}{Max(T(\lambda))}$).""" return ((self.metadata['WidthEff'][0] * self.native_unit).to(self.unit)).value @property def width(self) -> np.ndarray[float, float]: """Width of the filter.""" return np.array([self.central_wavelength - self.wavelength_min, self.wavelength_max - self.central_wavelength])[:,np.newaxis] # =============================================== # Class method # =============================================== @classmethod def _from_filter_name(cls, filter_name: str, filter_path: str | os.PathLike | None = None, logger: logging.Logger | None = None, log_level: str = 'INFO', display_unit: WavelengthUnit = WavelengthUnit.MICROMETER) -> 'PhotometryFilter': ''' Build an instance of Filter from the filter_name. filter_name must be under the format facility/instrument.filter_id (e.g. 'Keck/NIRC2.Lp') Examples -------- >>> Filter = PhotometryFilter.from_filter_name('Keck/NIRC2.Lp') Parameters ---------- filter_name : str Full name of the filter Returns ------- 'PhotometryFilter' Instance of PhotometryFilter Notes ----- Authors: Allan Denis ''' facility, instrument, filter_id = filter_name.split('/')[0], filter_name.split('/')[1].split('.')[0], filter_name.split('/')[1].split('.')[1] return cls(facility=facility, instrument=instrument, filter_id=filter_id, filter_path=filter_path, logger=logger, log_level=log_level, display_unit=display_unit) # =============================================== # Methods # =============================================== def _validate(self) -> None: ''' Validate the filter data. Notes ----- Authors: Allan Denis ''' if not isinstance(self._filter_path, (str, os.PathLike)): raise ForMoSAError(f"<Wrong type for filter_path: {type(self._filter_path)}. Expected str or os.PathLike>", self.logger) if not isinstance(self._facility, str): raise ForMoSAError(f"<Wrong type for facility: {type(self._facility)}. Expected str>", self.logger) if not isinstance(self._instrument, str): raise ForMoSAError(f"<Wrong type for instrument: {type(self._instrument)}. Expected str>", self.logger) if not isinstance(self._filter_id, str): raise ForMoSAError(f"<Wrong type for filter_id: {type(self._filter_id)}. Expected str>", self.logger) if not isinstance(self._native_unit, WavelengthUnit): raise ForMoSAError(f"<Wrong type for native_unit: {type(self._native_unit)}. Expected a WavelengthUnit>", self.logger) if not isinstance(self._display_unit, WavelengthUnit): raise ForMoSAError(f"<Wrong type for display_unit: {type(self._display_unit)}. Expected a WavelengthUnit>", self.logger) def _svo_filter_trans(self) -> None: ''' Get the filter data either from a local FITS file or from the Spanish Virtual Observatory's Filter Profile Service. Notes ----- Authors: Allan Denis ''' try: self._logger.debug(f"<Look for the filter {self.name} in the folder {self.folder}>") self._load_filter_data_from_fits() except ForMoSAError as e: try: self._logger.info(f"<Recovering filter {self.name} in the folder {self.folder} produced the following error: {e}. Looking in the Spanish Virtual Observation's FIlter Profile Service>") self._load_filter_data_from_svo() self._logger.debug(f"<Saving the filter {self.name} to the folder {self.folder}>") self._save_filter() except IndexError: raise ForMoSAError(f"<No filter found for requested SVO Filter {self.name}>", self.logger) except Exception as e: raise ForMoSAError(e, self.logger) def _plot_transmission_curve(self, fig: Figure | None = None, ax: Axes | None = None, plot_config: PhotometricPlotConfig = PhotometricPlotConfig(), main_plot_config: MainPlotConfig = MAIN_PLOT) -> None: ''' Method to plot the transmission curve Parameters ---------- figure : matplotlib.figure.Figure Figure (used to overplot on an existing figure) ax : matplotlib.axes._axes.Axes Ax (used to overplot on an existing ax) plot_config : PhotometricPlotConfig Instance of class PhotometricPlotConfig main_plot_config : MainPlotConfig Instance of class MainPlotConfig Returns ------- fig : matplotlib.figure.Figure Updated figure ax : matplotlib.axes._axes.Axes Updated ax Notes ----- Authors: Allan Denis ''' self._logger.info(f" Plotting transmission curve of filter {self.name}") # -------------------------------------------------- # Figure / axes creation # -------------------------------------------------- if ax is None: fig, ax = plt.subplots(figsize=main_plot_config.figsize) ax.set_xlabel(f'Wavelength ({self.unit})') ax.set_ylabel('Transmission') if plot_config.label_filter: ax.plot(self.wavelength, self.transmission, color = plot_config.color, label = f'{self.name}') ax.legend(fontsize=plot_config.legend_fontsize) else: ax.plot(self.wavelength, self.transmission, color = plot_config.color) return fig, ax def _set_unit(self, unit: WavelengthUnit) -> None: ''' Method to set the unit used for the wavelength Parameters ---------- unit : WavelengthUnit unit used (micrometer', 'nanometer', 'angstrom') Notes ----- Authors: Allan Denis ''' if not isinstance(unit, WavelengthUnit): raise ForMoSAError(f"<Unit must be a WavelengthUnit Enum, got {type(unit)}>", self.logger) self._logger.debug(f"<Convert the unit used for Filter {self.name} from {self.native_unit} to {unit.unit}>") self._display_unit = unit self._validate self._logger.info(f" Setting wavelength unit to {unit.unit} for filter {self.name} to {self.unit}") def _save_filter(self) -> None: ''' Save filter data and metadata into a FITS file. Notes ----- Authors: Allan Denis ''' path = self.file_path # ========================== # PRIMARY HDU # ========================== primary_hdu = fits.PrimaryHDU() hdr = primary_hdu.header hdr['FACILITY'] = self.facility hdr['INSTR'] = self.instrument hdr['FILTERID'] = self.filter_id hdr['WAVUNIT'] = self.native_unit.to_string() hdr['ORIGIN'] = 'SVO FPS' # ========================== # TRANSMISSION TABLE # ========================== wavelength = self.data['Wavelength'] transmission = self.data['Transmission'] cols = [ fits.Column( name='Wavelength', array=wavelength, format='D', unit=self.native_unit.to_string() ), fits.Column( name='Transmission', array=transmission, format='D' ) ] trans_hdu = fits.BinTableHDU.from_columns( cols, name='TRANSMISSION' ) # ========================== # METADATA TABLE # ========================== new_meta = Table() for col in self.metadata.colnames: data = self.metadata[col] if data.dtype.kind in ("U", "S", "O"): maxlen = max(len(str(v)) for v in data if v is not None) new_meta[col] = Column( np.array(data, dtype=f'U{maxlen}') ) else: new_meta[col] = data meta_hdu = fits.BinTableHDU( new_meta, name='METADATA' ) # ========================== # WRITE FILE # ========================== hdul = fits.HDUList([primary_hdu, trans_hdu, meta_hdu]) hdul.writeto(path, overwrite=True) self._logger.info(f"<Filter {self.name} saved to path {path}>") def _load_filter_data_from_svo(self) -> None: ''' Method to Query filter data in the Spanish Virtual Observatory's Filter Profile Service Notes ----- Authors: Mickael Bonnefoy and Allan Denis ''' data = SvoFps.get_transmission_data(self.name) # Some facilities (e.g. 2MASS) expose valid filter IDs but do not return # metadata for a Facility+Instrument query. Fall back to Facility only. try: metadata = SvoFps.get_filter_list(facility=self.facility, instrument=self.instrument) except IndexError: self._logger.warning( f"No SVO metadata for Facility={self.facility} and Instrument={self.instrument}. " "Retrying with Facility only." ) metadata = SvoFps.get_filter_list(facility=self.facility, instrument=None) metadata = metadata[metadata['filterID'] == self.name] if len(metadata) == 0: raise IndexError(f"No metadata row found in SVO for filter {self.name}") SvoFps.clear_cache() self._data = data self._metadata = metadata self._logger.info(f" Filter data for {self.name} successfully found") def _load_filter_data_from_fits(self) -> None: """ Method to load filter data from the fits file. Notes ----- Authors: Allan Denis """ # Convert to Path if it's a string fits_path = self.file_path if not fits_path.exists(): raise ForMoSAError(f"File {fits_path} does not exist", self.logger) # Open the FITS file self._logger.debug(f"Open fits file {fits_path}") with fits.open(fits_path) as hdul: # ========================== # PRIMARY HDU # ========================== primary_hdu = hdul[0] primary_header = primary_hdu.header # Extract header information self._facility = primary_header.get('FACILITY') self._instrument = primary_header.get('INSTR') self._filter_id = primary_header.get('FILTERID') Unit = primary_header.get('WAVUNIT') self._native_unit = WavelengthUnit[Unit.lower()] # ========================== # TRANSMISSION TABLE (HDU 1) # ========================== trans_hdu = hdul[1] transmission_data = trans_hdu.data wavelength, transmission = transmission_data['Wavelength'], transmission_data['Transmission'] # Mask transmission values if necessary (for example, to mask invalid values like NaN or zeros) wavelength_column = MaskedColumn(wavelength, name='Wavelength', unit=self.native_unit, length=len(wavelength)) transmission_column = MaskedColumn(transmission, name='Transmission', unit='', length=len(transmission)) transmission_data = Table([wavelength_column, transmission_column]) # Extract wavelength and transmission data self._data = transmission_data # ========================== # METADATA TABLE (HDU 2) # ========================== meta_hdu = hdul[2] self._metadata = Table(meta_hdu.data) self._logger.info(f" Filter data for {self.name} successfully found")