Source code for gammapy_plugin.gammapy_like

import logging
from typing import TYPE_CHECKING

import numpy as np
from astromodels.core.model import Model
from astromodels.functions.priors import Truncated_gaussian
from gammapy.datasets import Dataset, Datasets
from gammapy.modeling.models import DatasetModels, ModelBase, Models
from threeML.plugin_prototype import PluginPrototype

from gammapy_plugin.converter import AstromodelConverter
from gammapy_plugin.utils.gammapy_parser import (
    parameter_to_gammapy_dict,
    parse_gammapy_model,
)

if TYPE_CHECKING:
    from threeML.analysis_results import _AnalysisResults


__all__ = ["GammapyLike"]

log = logging.getLogger(__name__)

__instrument_name = "gammapy"


[docs] class GammapyLike(PluginPrototype): """A plugin for including Gammapy datasets.""" def __new__(cls, *args, **kwargs) -> PluginPrototype: instance = object.__new__(cls) return instance def __init__(self, name: str, **kwargs) -> None: """ :param name: Name of the plugin :type name: str """ nuisance_parameters = kwargs.get("nuisance_parameters", {}) super(GammapyLike, self).__init__(name, nuisance_parameters=nuisance_parameters) self._frame = kwargs.get("frame", "icrs") self._sources = kwargs.get("sources", None) self._nuisance_mapping = {} self._background_models = kwargs.get("background_models", {}) self._background_models_mapper = {} self._nuisance_parameters_dicts = {} if len(self._background_models.keys()) > 0: self._parse_background_models()
[docs] def set_datasets( self, datasets: Dataset | Datasets | list[Dataset], mode: str = "individual", stacked_name: str = "stacked", ) -> None: """Set the Gammapy Dataset. :param datasets: list of Gammapy datasets or a single Dataset object :param mode: individual or stacked - defaults to individual, stacked stacks the passed datasets :param stacked_name: name of the stacked datasset if mode is stacked """ if mode not in [ "individual", "stacked", ]: raise ValueError("mode needs to be individual or stacked") if isinstance(datasets, list): self._datasets = Datasets() for d in datasets: self._datasets.append(d) if mode == "stacked": self._datasets = Datasets( self._datasets.stack_reduce(name=stacked_name) ) elif isinstance(datasets, Datasets): self._datasets = datasets if mode == "stacked": self._datasets = Datasets( self._datasets.stack_reduce(name=stacked_name) ) elif isinstance(datasets, Dataset): self._datasets = Datasets(datasets) if mode == "stacked": log.info("Only using a single dataset - can not stack that") else: msg = "datasets has to be list of Dataset," msg += " a single Datasets or Dataset object" raise TypeError(msg)
[docs] def set_sources(self, sources: list | str = None) -> None: """ Set the sources to be used by this plugin - No need to specify bkg models :param sources: Source(s) to be used in the analysis defaults to all :type sources: list of str or str """ if isinstance(sources, list): self._sources = sources elif isinstance(sources, str): self._sources = [sources] elif sources is None: self._sources = None else: raise ValueError("")
[docs] def set_model( self, likelihood_model: Model, converted_model: AstromodelConverter = None, ) -> None: """Set the model to be used in the joint minimization. :param likelihood_model: astromodels model :param converted_model: converted astromodels :type likelihood_model: Model :type converted_model: AstromodelConverter """ if self._sources is None: log.info( "If you want to specify sources for this Plugin you MUST do so before" ) else: log.debug(f"Will use {self._sources} for this plugin") self._likelihood_model: Model = likelihood_model if converted_model is not None: self._likelihood_model_converted: AstromodelConverter = converted_model elif hasattr(self, "_likelihood_model_converted"): pass else: self._likelihood_model_converted: AstromodelConverter = AstromodelConverter( model=self._likelihood_model, frame=self._frame ) self._update_gammapy_model_list() self._assign_models()
def _update_gammapy_model_list(self) -> Models: """Update the list of gammapy models.""" if hasattr(self, "_likelihood_model_converted"): if self._sources is not None: tmp2 = [ x for x in self._likelihood_model_converted.gammapy_models if x.name in self._sources ] tmp = [*tmp2] else: tmp2 = [x for x in self._likelihood_model_converted.gammapy_models] tmp = [*tmp2] else: tmp = [] tmp2 = [] self._global_models = Models(tmp2) if hasattr(self, "_background_models"): for m in self._background_models.values(): tmp.append(m) self._gammapy_model = Models(tmp)
[docs] def set_background_models( self, bkg_model: ModelBase | list | Models | DatasetModels ) -> None: """Set the gammapy background models (e.g. FoVBackgroundModel) :param bkg_model: Background model(s) :type bkg_model: ModelBase or list of ModelBase or Models or DatasetModels.""" if isinstance(bkg_model, ModelBase): bkg_model = [bkg_model] else: if not isinstance(bkg_model, (list, Models, DatasetModels)): raise TypeError( "either pass a singular gammapy model or a list of models" ) for b in bkg_model: self._background_models[b.name] = b self._background_models_mapper[b.datasets_names[0]] = b.name self._parse_background_models() self._update_gammapy_model_list() self._assign_models()
def _parse_background_models(self): """Parse the background models and link the gammapy parameters to nuissance parameters of this plugin and set the prior.""" # TODO way of manually specifying the priors for name, bkg in self._background_models.items(): bkg_paras = parse_gammapy_model(bkg, self._name) for k, v in bkg_paras.items(): para_path = k.split(".") self._nuisance_mapping[k] = para_path self._nuisance_parameters[k] = v if v.is_normalization and v.free: self._nuisance_parameters[k].prior = Truncated_gaussian( mu=1.0, sigma=0.1, lower_bound=0.2, upper_bound=1.8 ) self._nuisance_parameters_dicts[k] = parameter_to_gammapy_dict(v) def _update_background_models(self): # TODO rewrite this to directly update for k, v in self._nuisance_parameters.items(): self._nuisance_parameters_dicts[k]["value"] = v.value p = self._nuisance_mapping[k] # TODO find an elegant way to not hardcode this self._background_models[p[1]].parameters[p[2]].update_from_dict( self._nuisance_parameters_dicts[k] ) def _assign_models(self): """""" for d in self._datasets: d.models = [] tmp = [] for g in self._global_models: tmp.append(g) if d.name in self._background_models_mapper.keys(): tmp.append( self._background_models[self._background_models_mapper[d.name]] ) d.models = Models(tmp)
[docs] def get_log_like(self) -> float: """Return the value of the log-likelihood with the current values for the parameters stored in the model instance.""" self._likelihood_model_converted._update_parameters() self._update_background_models() return -0.5 * self._datasets._stat_sum_likelihood()
[docs] def inner_fit(self): return self.get_log_like()
[docs] def get_number_of_data_points(self) -> np.int64: # TODO: check if this works for all allowed data types return np.sum([np.prod(d.counts.data.shape) for d in self._datasets])
[docs] def distribute_covariance(self, result: "_AnalysisResults") -> None: """Function to pass the (estimated) Covariance Matrix to the gammapy parameters so that the gammapy plotting functions can display the correct uncertainty. :param result: the analysis result :type result: BayesianResults or MLEResults """ pass
@property def datasets(self) -> Datasets: """Gammapy datasets of the plugin.""" return self._datasets @property def model(self) -> Model: """Astromodels model of the plugin.""" return self._likelihood_model @property def astromodel_converter(self) -> AstromodelConverter: """AstromodelConverter object used for this plugin.""" return self._likelihood_model_converted @property def gammapy_model(self): """List of all the Gammapy SkyModels.""" if not hasattr(self, "_gammapy_model"): self._update_gammapy_model_list() return self._gammapy_model @property def frame(self) -> str: """Coordinate Frame of the plugin.""" return self._frame