diff --git a/docs/inputs/agents.rst b/docs/inputs/agents.rst index d52b09daf..9de69d5a3 100644 --- a/docs/inputs/agents.rst +++ b/docs/inputs/agents.rst @@ -102,8 +102,8 @@ The columns have the following meaning: ``obj_data1`` - A weight associated with the objective. - Whether it is used will depend in large part on the :ref:`decision method `. + A value associated with the objective. + Its meaning, and whether it is used at all, will depend in large part on the :ref:`decision method `. ``obj_sort1`` @@ -168,23 +168,32 @@ Additional objectives decision methods are available with MUSE, as implemented in :py:mod:`~muse.decisions`: - - :py:func:`mean `: Computes the average across several objectives. - - :py:func:`weighted_sum `: Computes a weighted average across several - objectives. - - :py:func:`lexical_comparion `: Compares objectives using a - binned lexical comparison operator. Aliased to "lexo". This is a `lexicographic method `_ where objectives are compared in a specific order, for example first costs, then environmental emissions. - - :py:func:`retro_lexical_comparion `: A binned lexical - comparison function where the bin size is adjusted to ensure the current crop of - technologies are competitive. Aliased to "retro_lexo". - - :py:func:`epsilon_constraints `: A comparison method which - ensures that first selects technologies following constraints on objectives 2 and - higher, before actually ranking them using objective 1. Aliased to "epsilon" and - "epsilon_con". - - :py:func:`retro_epsilon_constraints `: A variation on - epsilon constraints which ensures that the current crop of technologies are not - deselected by the constraints. Aliased to "retro_epsilon". - - :py:func:`single_objective `: A decision method to allow - ranking via a single objective. + - :py:func:`mean `: Computes the arithmetic mean across multiple objectives, + treating all objectives as equally important. + - :py:func:`weighted_sum `: Computes a weighted average across + objectives using weights defined in ``obj_data``, where higher weights increase the + importance of an objective in the final score. + - :py:func:`lexical_comparion `: Ranks technologies using a + priority ordering of objectives. The first objective is considered within a + tolerance defined by ``obj_data1`` (e.g. 0.1 corresponds to a 10% tolerance), and + only distinguishes technologies outside this tolerance. Subsequent objectives + (``obj_data2``, ``obj_data3``) are used to break ties. Aliased to "lexo". + - :py:func:`retro_lexical_comparion `: A variant of lexical + comparison where tolerances are adjusted so that existing technologies remain + feasible for comparison. Otherwise, behaviour matches ``lexical_comparison``. + Aliased to "retro_lexo". + - :py:func:`epsilon_constraints `: Selects technologies by + optimising the first objective while using all remaining objectives as feasibility + filters. Each filter applies a threshold defined in ``obj_data2`` and + ``obj_data3``, with direction (above or below the threshold) determined by + ``obj_sort2`` and ``obj_sort3``. ``obj_data1`` is not used, but must be provided. + Aliased to "epsilon" and "epsilon_con". + - :py:func:`retro_epsilon_constraints `: A variant of + epsilon constraints where thresholds are adjusted so that existing technologies are + never excluded by feasibility conditions. Otherwise behaves identically to + epsilon_constraints. Aliased to "retro_epsilon". + - :py:func:`single_objective `: Ranks technologies using a single + objective without considering additional objectives or constraints. The functions allow for any number of objectives. However, the format described here allows only for three. diff --git a/src/muse/decisions.py b/src/muse/decisions.py index f3aae64c4..cbfbeb6e4 100644 --- a/src/muse/decisions.py +++ b/src/muse/decisions.py @@ -38,15 +38,19 @@ def weighted_sum(objectives: Dataset, parameters: Any, **kwargs) -> DataArray: "weighted_sum", ] -from collections.abc import Mapping, MutableMapping, Sequence +from collections.abc import Hashable, Mapping, MutableMapping, Sequence from typing import ( Any, Callable, ) +import numpy as np +import xarray as xr from xarray import DataArray, Dataset from muse.registration import registrator +from muse.timeslices import broadcast_timeslice, drop_timeslice +from muse.utilities import tupled_dimension PARAMS_TYPE = Sequence[tuple[str, bool, float]] """Standard decision parameter type. @@ -170,76 +174,210 @@ def weighted_sum(objectives: Dataset, parameters: Mapping[str, float]) -> DataAr def lexical_comparison( objectives: Dataset, parameters: PARAMS_TYPE | Sequence[tuple[str, float]] ) -> DataArray: - """Lexical comparison over the objectives. + """Lexicographic comparison using the best available replacements as reference. Lexical comparison operates by binning the objectives into bins of width - w_i = min_j(p_i o_i^j). Once binned, dimensions other than `asset` and - `technology` are reduced by taking the max, e.g. the largest constraint. - Finally, the objectives are ranked lexographically, in the order given by the - parameters. + + w_i = min_j(p_i o_i^j), + + where o_i^j are the objective values of the candidate replacements and the + minimum is taken over the replacement dimension. Once binned, dimensions + other than ``asset`` and ``replacement`` are reduced by taking the maximum + (e.g. the largest constraint). Finally, the objectives are ranked + lexicographically in the order given by ``parameters``. The result is an array of tuples which can subsequently be compared lexicographically. """ - from muse.utilities import lexical_comparison - assert len(parameters) > 0 - if len(parameters[0]) == 3: - parameters = [(u[0], coeff_sign(u[1], u[2])) for u in parameters] - assert set(objectives.data_vars).issuperset([u[0] for u in parameters]) - order = [u[0] for u in parameters] - binsize = objectives.copy() - for obj_name, weight in parameters: - binsize[obj_name] = binsize[obj_name] * weight + # Convert (name, min/max, coefficient) specifications to + # (name, signed coefficient), where the sign encodes whether the + # objective should be minimised or maximised. + if len(parameters[0]) == 3: + parameters = [ + (name, coeff_sign(minmax, coeff)) for name, minmax, coeff in parameters + ] + + # Lexicographic priority of the objectives. + order = tuple(name for name, _ in parameters) + + # All requested objectives must be present. + assert set(objectives.data_vars).issuperset(order) + + # Define the bin widths as + # + # w_i = min_j(p_i o_i^j), + # + # i.e. the weighted objective values of the candidate replacements, + # taking the minimum over the replacement dimension. + binsize = objectives.copy(deep=True) + + # Temporarily flatten the timeslice MultiIndex to avoid xarray's + # deprecated Dataset assignment path when modifying variables. + if "timeslice" in binsize.indexes: + index_names = binsize.indexes["timeslice"].names + binsize = binsize.reset_index("timeslice") + + # Apply the objective weights (including minimisation/maximisation + # direction encoded in the sign). + for name, weight in parameters: + binsize[name] *= weight + + # Restore the original timeslice MultiIndex structure. + if "timeslice" in objectives.indexes: + binsize = binsize.set_index(timeslice=index_names) + + # Use the smallest weighted objective across replacements as the + # bin width for each objective. binsize = binsize.min("replacement") - return lexical_comparison(objectives, binsize, order=order, bin_last=False).rank( - "replacement" - ) + # Construct lexicographically comparable tuples and rank the + # replacement options accordingly. + return _lexical_comparison( + objectives, + binsize, + order=order, + keep_last_continuous=True, + ).rank("replacement") @register_decision(name="retro_lexo") def retro_lexical_comparison( - objectives: Dataset, parameters: PARAMS_TYPE | Sequence[tuple[str, float]] + objectives: Dataset, + parameters: PARAMS_TYPE | Sequence[tuple[str, float]], ) -> DataArray: - """Lexical comparison over the objectives. + """Lexicographic comparison using current assets as reference. Lexical comparison operates by binning the objectives into bins of width - w_i = p_i o_i, where i are the current assets. Once binned, dimensions other - than `asset` and `replacement` are reduced by taking the max, e.g. the - largest constraint. Finally, the objectives are ranked lexographically, in - the order given by the parameters. + + w_i = p_i o_i, + + where o_i are the objective values of the current assets. Once binned, + dimensions other than ``asset`` and ``replacement`` are reduced by taking + the maximum (e.g. the largest constraint). Finally, the objectives are + ranked lexicographically in the order given by ``parameters``. The result is an array of tuples which can subsequently be compared lexicographically. """ - from muse.utilities import lexical_comparison - assert len(parameters) > 0 + + # Convert (name, min/max, coefficient) specifications to + # (name, signed coefficient), where the sign encodes whether the + # objective should be minimised or maximised. if len(parameters[0]) == 3: - parameters = [(u[0], coeff_sign(u[1], u[2])) for u in parameters] + parameters = [ + (name, coeff_sign(minmax, coeff)) for name, minmax, coeff in parameters + ] + + # Retrofitting compares candidate replacements against the current + # asset, so every asset must also appear amongst the replacements. assert objectives.asset.isin(objectives.replacement).all() - assert set(objectives.data_vars).issuperset([u[0] for u in parameters]) - order = [u[0] for u in parameters] - binsize = Dataset(dict(parameters)) * objectives.sel(replacement=objectives.asset) - return lexical_comparison(objectives, binsize, order=order, bin_last=False).rank( - "replacement" - ) + # Lexicographic priority of the objectives. + order = tuple(name for name, _ in parameters) + + # All requested objectives must be present. + assert set(objectives.data_vars).issuperset(order) + + # Define the bin widths as + # + # w_i = p_i o_i, + # + # where o_i are the objective values of the current assets. The + # objective values are selected by matching each asset to itself + # along the replacement dimension. + binwidths = Dataset(dict(parameters)) * objectives.sel(replacement=objectives.asset) + + # Construct lexicographically comparable tuples and rank the + # replacement options accordingly. + return _lexical_comparison( + objectives, + binwidths, + order=order, + keep_last_continuous=True, + ).rank("replacement") + + +def _lexical_comparison( + objectives: xr.Dataset, + binwidths: xr.Dataset, + order: Sequence[Hashable], + *, + keep_last_continuous: bool = False, +) -> xr.DataArray: + """Lexical comparison over the objectives. + + Lexical comparison operates by binning the objectives into bins of width + ``binwidths``. Once binned, dimensions other than ``asset`` and + ``replacement`` are reduced by taking the maximum (e.g. the largest + constraint). Finally, the objectives are ranked lexicographically in the + order given by ``order``. + + Arguments: + objectives: xr.Dataset containing the objectives to rank. + binwidths: Bin widths used to discretise the objectives. + order: Order in which objectives are compared lexicographically. + keep_last_continuous: Whether the final objective should be left as a + continuous value, rather than being discretised. + + Result: + An array of tuples which can subsequently be compared + lexicographically. + """ + # Restrict to the objectives participating in the comparison and + # temporarily flatten the timeslice MultiIndex to avoid Dataset + # assignment issues in xarray. + result = drop_timeslice(objectives[list(order)]).copy() + + # All objectives are discretised except, optionally, the final + # tie-breaking objective. + discretized = order[:-1] if keep_last_continuous else order + + # Convert objectives to integer-valued bins. + for name in discretized: + result[name] = np.floor(result[name] / binwidths[name]).astype(np.int64) + + # Preserve the final objective as a continuous quantity for + # tie-breaking, while still normalising by its bin width. + if keep_last_continuous: + name = order[-1] + result[name] = result[name] / binwidths[name] + + # Combine the ordered objectives into tuples that can be compared + # lexicographically. + return result.to_array(dim="objective").reduce(tupled_dimension, dim="objective") def _epsilon_constraints( - objectives: Dataset, optimize: str, mask: Any | None = None, **epsilons + objectives: Dataset, + optimize: str, + mask: Any | None = None, + **epsilons, ) -> DataArray: - """Minimizes one objective subject to constraints on other objectives.""" + """Selects the best value of a target objective subject to epsilon constraints. + + Each constraint enforces that an objective must be below (or above, after + sign handling upstream) a threshold, aggregated over all non-(asset, + replacement) dimensions. + """ + # Start with all options feasible constraints = True + + # Build feasibility mask from epsilon constraints for name, epsilon in epsilons.items(): + # Reduce over all non-decision dimensions (e.g. timeslice, region) reduced_dims = set(objectives[name].dims) - {"asset", "replacement"} + + # All slices must satisfy constraint constraints = constraints & (objectives[name] <= epsilon).all(reduced_dims) + # Default mask = something worse than any feasible objective value if mask is None: mask = objectives[optimize].max() + 1 + + # Return objective values, masking infeasible alternatives return objectives[optimize].where(constraints, mask) @@ -249,60 +387,79 @@ def epsilon_constraints( parameters: PARAMS_TYPE | Sequence[tuple[str, bool, float]], mask: Any | None = None, ) -> DataArray: - r"""Minimizes first objective subject to constraints on other objectives. + """Epsilon-constraint optimisation. - The parameters are a sequence of tuples `(name, minimize, epsilon)`, where - `name` is the name of the objective, `minimize` is `True` if minimizing and - false if maximizing that objective, and `epsilon` is the constraint. The - first objective is the one that will be minimized according to: + The first objective is optimised (min or max), while all subsequent + objectives are treated as constraints of the form: - Given objectives :math:`O^{(i)}_t`, with :math:`i \in [|1, N|]` and :math:`t` the - replacement technologies, this function computes the ranking with respect to - :math:`t`: + objective_i <= epsilon_i - .. math:: + after sign normalization. + """ + assert set(objectives.data_vars).issuperset([p[0] for p in parameters]) - \mathrm{ranking}_{O^{(i)}_t < \epsilon_i} O^{(0)}_t + # Remove obj_data parameters if present + optimize_name, optimize_minimize, _ = parameters[0] + # Encode optimization direction + do_minimize = Dataset({optimize_name: 1 if optimize_minimize else -1}) - The first tuple can be restricted to `(name, minimize)`, since `epsilon` is ignored. + # Remaining objectives also get sign encoding + for name, minimize, _ in parameters[1:]: + do_minimize[name] = coeff_sign(minimize, 1) - The result is the matrix :math:`O^{(0)}` modified such minimizing over the - replacement dimension value would take into account the constraints and the - optimization direction (minimize or maximize). In other words, calling - `result.rank('replacement')` will yield the expected result. - """ - assert set(objectives.data_vars).issuperset([param[0] for param in parameters]) - do_minimize = Dataset({k: coeff_sign(v, 1) for k, v, _ in parameters[1:]}) - do_minimize[parameters[0][0]] = 1 if parameters[0][1] else -1 - dict_params = {k: v for k, _, v in parameters[1:] if k in objectives.data_vars} - constraints = do_minimize * Dataset(dict_params) + # Extract epsilon constraints + epsilons = { + name: coeff_sign(minimize, 1) * eps + for name, minimize, eps in parameters[1:] + if name in objectives.data_vars + } + + # Apply sign transformation + constraints + if "timeslice" in objectives.indexes: + do_minimize = broadcast_timeslice(do_minimize) return _epsilon_constraints( - objectives * do_minimize, parameters[0][0], mask=mask, **constraints.data_vars + objectives * do_minimize, + optimize_name, + mask=mask, + **epsilons, ) @register_decision(name="retro_epsilon") def retro_epsilon_constraints( - objectives: Dataset, parameters: PARAMS_TYPE + objectives: Dataset, + parameters: PARAMS_TYPE, ) -> DataArray: - """Epsilon constraints where the current tech is included. + """Epsilon-constraint optimisation with asset-relative thresholds. - Modifies the parameters to the function such that the existing technologies are - always competitive. + Epsilon thresholds are adjusted so that the current technology is always + feasible, ensuring it remains in the choice set. """ + # Extract current asset baseline asset_objectives = objectives.sel(replacement=objectives.asset) - def transform(name, minimize, epsilon=None): + def adapt_param(name, minimize, epsilon=None): + """Adjust epsilon so that current asset is always feasible.""" if epsilon is None: - return name, minimize - am = getattr(asset_objectives, name) - new_eps = am.where(am > epsilon if minimize else am < epsilon, epsilon) - return name, minimize, new_eps - - parameters = [ - transform(*param) for param in parameters if param[0] in objectives.data_vars - ] + return name, minimize, None + + current = asset_objectives[name] + + # Work in the same transformed logic as epsilon_constraints + sign = -1 if minimize else 1 + + # Ensure current asset is not excluded by its own constraint + adjusted = current.where( + (sign * current) <= (sign * epsilon), + epsilon, + ) + + return name, minimize, adjusted + + # Filter valid objectives and adapt epsilons + parameters = [adapt_param(*p) for p in parameters if p[0] in objectives.data_vars] + return epsilon_constraints(objectives, parameters) diff --git a/src/muse/utilities.py b/src/muse/utilities.py index 8056e3342..a5d0d7164 100644 --- a/src/muse/utilities.py +++ b/src/muse/utilities.py @@ -310,47 +310,6 @@ def tupled_dimension(array: np.ndarray, axis: int): return result.reshape(*shape[:-1]) -def lexical_comparison( - objectives: xr.Dataset, - binsize: xr.Dataset, - order: Sequence[Hashable] | None = None, - bin_last: bool = True, -) -> xr.DataArray: - """Lexical comparison over the objectives. - - Lexical comparison operates by binning the objectives into bins of width - `binsize`. Once binned, dimensions other than `asset` and `technology` are - reduced by taking the max, e.g. the largest constraint. Finally, the - objectives are ranked lexographically, in the order given by the parameters. - - Arguments: - objectives: xr.Dataset containing the objectives to rank - binsize: bin size, minimization direction - (+ -> minimize, - -> maximize), and (optionally) order of - lexicographical comparison. The order is the one given - `binsize.data_vars` if the argument `order` is None. - order: Optional array indicating the order in which to rank the tuples. - bin_last: Whether the last metric should be binned, or whether it - should be left as a the type it already is (e.g. no flooring and - no turning to integer.) - - Result: - An array of tuples which can subsequently be compared lexicographically. - """ - if order is None: - order = [u for u in binsize.data_vars] - - assert set(order) == set(binsize.data_vars) - assert set(order).issuperset(objectives) - - result = objectives[order] - for name in order if bin_last else order[:-1]: - result[name] = np.floor(result[name] / binsize[name]).astype(int) - if not bin_last: - result[order[-1]] = result[order[-1]] / binsize[order[-1]] - return result.to_array(dim="variable").reduce(tupled_dimension, dim="variable") - - def merge_assets( capa_a: xr.DataArray, capa_b: xr.DataArray, diff --git a/tests/test_decisions.py b/tests/test_decisions.py index 6bb9525de..36d623d3a 100644 --- a/tests/test_decisions.py +++ b/tests/test_decisions.py @@ -1,8 +1,8 @@ import numpy as np +import xarray as xr from numpy.random import choice, rand from pytest import approx, fixture from scipy.stats import rankdata -from xarray import Dataset from muse.decisions import ( epsilon_constraints, @@ -15,7 +15,7 @@ @fixture def objectives(): - objectives = Dataset() + objectives = xr.Dataset() objectives["replacement"] = choice( list("abcdefghijklmnopqrstuvwxyz"), 10, replace=False ) @@ -55,17 +55,38 @@ def normalize(objective): def test_lexical(): """Test lexical comparison against hand-constructed tuples.""" shape = (5, 10) + + # Three objectives evaluated for each asset/replacement pair. a = rand(*shape) * 10 - 5 b = rand(*shape) * 10 - 5 c = rand(*shape) * 10 - 5 - parameters = [("b", -rand() * 0.1), ("a", rand() * 0.1), ("c", rand())] + # Objectives are compared lexicographically in the order + # b -> a -> c. Negative weights indicate maximisation. + parameters = [ + ("b", -rand() * 0.1), + ("a", rand() * 0.1), + ("c", rand()), + ] param_dict = dict(parameters) + # Lexo defines bin widths as + # + # w_i = min_j(p_i o_i^j), + # + # over the replacement dimension. For maximisation objectives, + # the sign convention means this becomes the negative of the + # largest weighted objective. mina = (a * param_dict["a"]).min(1) minb = -(b * abs(param_dict["b"])).max(1) minc = (c * param_dict["c"]).min(1) + # Construct the expected lexicographic tuples by hand: + # + # 1. Bin the first two objectives by flooring after + # normalisation by their bin widths. + # 2. Leave the final objective continuous so that it acts + # as a tie-breaker. expected = np.zeros(shape=shape, dtype=object) for i in range(shape[0]): for j in range(shape[1]): @@ -75,17 +96,26 @@ def test_lexical(): c[i, j] / minc[i], ) - objectives = Dataset( + objectives = xr.Dataset( { "a": (("asset", "replacement"), a), "b": (("asset", "replacement"), b), "c": (("asset", "replacement"), c), } ) - objectives["asset"] = choice(objectives.replacement, shape[0], replace=False) + + # Associate each asset with one of the replacement options. + objectives["asset"] = choice( + objectives.replacement, + shape[0], + replace=False, + ) actual = lexical_comparison(objectives, parameters) assert actual.shape == expected.shape + + # The decision function returns the ranks of the lexicographic + # tuples along the replacement dimension. for i in range(shape[0]): assert actual.values[i] == approx(rankdata(expected[i])) @@ -99,19 +129,19 @@ def reshape_array(size, shape, start=1): # Test case 1: Basic constraints expected = objectives.a * (objectives.asset == objectives.asset) - params = [("a", True), ("b", True, objectives.b.max() + 1)] + params = [("a", True, 1), ("b", True, objectives.b.max() + 1)] actual = epsilon_constraints(objectives, params) assert actual.values == approx(expected.values) # Test case 2: Negative constraints expected = -objectives.a * (objectives.asset == objectives.asset) - params = [("a", False), ("b", True, objectives.b.max() + 1)] + params = [("a", False, 1), ("b", True, objectives.b.max() + 1)] actual = epsilon_constraints(objectives, params) assert actual.values == approx(expected.values) # Test case 3: Binary choice constraints objectives.b[:] = choice((1, 2), objectives.b.size).reshape(objectives.b.shape) - params = [("a", True), ("b", True, 1.5)] + params = [("a", True, 1), ("b", True, 1.5)] expected = objectives.a.where(objectives.b == 1).fillna(-1) actual = epsilon_constraints(objectives, params, mask=-1) assert actual.values == approx(expected.values) @@ -119,7 +149,7 @@ def reshape_array(size, shape, start=1): # Test case 4: Multiple constraints objectives.b[:] = choice((1, 2), objectives.b.size).reshape(objectives.b.shape) objectives.c[:] = choice((1, 2, 3), objectives.c.size).reshape(objectives.c.shape) - params = [("a", True), ("b", True, 1.5), ("c", False, 1.2)] + params = [("a", True, 1), ("b", True, 1.5), ("c", False, 1.2)] condition = (objectives.b == 1) & (objectives.c >= 2).all("other") expected = objectives.a.where(condition).fillna(-1) actual = epsilon_constraints(objectives, params, mask=-1) diff --git a/tests/test_utilities.py b/tests/test_utilities.py index 76f86500a..053a1cd38 100644 --- a/tests/test_utilities.py +++ b/tests/test_utilities.py @@ -148,60 +148,6 @@ def test_tupled_dimension_3d(): verify_tupled_output(actual, array, axis, indices) -def create_test_objectives(shape=(5, 10)): - """Create test objectives dataset.""" - objectives = xr.Dataset() - for var in ["a", "b", "c"]: - objectives[var] = ("asset", "replacement"), np.random.rand(*shape) * 10 - 5 - objectives["asset"] = np.random.choice( - objectives.replacement, len(objectives.asset), replace=False - ) - return objectives - - -def create_test_binsizes(): - """Create test binsizes dataset.""" - return xr.Dataset( - { - "a": np.random.rand() * 0.1, - "b": -np.random.rand() * 0.1, - "c": np.random.rand(), - } - ) - - -@mark.parametrize("order", [["a", "b", "c"], ["b", "c", "a"], ["c", "a", "b"]]) -def test_lexical_comparison(order): - """Test lexical comparison with and without binning.""" - from muse.utilities import lexical_comparison - - objectives = create_test_objectives() - binsizes = create_test_binsizes() - - def create_expected(bin_last=True): - expected = np.zeros(shape=objectives.a.shape, dtype=object) - for i in range(expected.shape[0]): - for j in range(expected.shape[1]): - values = [] - for k in range(3): - val = objectives[order[k]][i, j] / binsizes[order[k]] - values.append(int(np.floor(val)) if bin_last or k < 2 else val) - expected[i, j] = tuple(values) - return expected - - # Test with binning - actual = lexical_comparison(objectives, binsizes[order]) - expected = create_expected(bin_last=True) - assert actual.shape == expected.shape - assert (actual.values == expected).all() - - # Test without binning last value - actual = lexical_comparison(objectives, binsizes[order], bin_last=False) - expected = create_expected(bin_last=False) - assert actual.shape == expected.shape - assert (actual.values == expected).all() - - def test_merge_assets(): """Test merging assets with different coordinate orders.""" from numpy import arange