Source code for calliope.time.masks

"""
Copyright (C) since 2013 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).

masks.py
~~~~~~~~

Functions to pick timesteps from data given certain criteria.

"""

import pandas as pd

from calliope.time import funcs
from calliope.core.util.dataset import split_loc_techs
from calliope import exceptions


def _get_array(data, var, tech, **kwargs):
    subset = {"techs": tech}
    if kwargs is not None:
        subset.update({k: v for k, v in kwargs.items()})

    unusable_dims = (
        set(subset.keys()).difference(["techs", "locs"]).difference(data[var].dims)
    )
    if unusable_dims:
        raise exceptions.ModelError(
            "Attempting to mask time based on  technology {}, "
            "but dimension(s) {} do not exist for parameter {}".format(
                tech, unusable_dims, var.name
            )
        )

    arr = split_loc_techs(data[var].copy()).loc[subset]
    arr = arr.mean(dim=[i for i in arr.dims if i != "timesteps"]).to_pandas()
    return arr


def zero(data, tech, var="resource", **kwargs):
    """
    Returns timesteps where ``var`` for the technology ``tech`` is zero.

    kwargs are additional dimensions to subset on, for example,
    ``locs=['location1', 'location2]``

    """
    s = _get_array(data, var, tech, **kwargs)

    return s[s == 0].index


def _concat_indices(indices):
    return pd.concat([i.to_series() for i in indices]).sort_index().index


def _get_minmax_timestamps(series, length, n, how="max", padding=None):
    # Get the max/min timestamps
    group = series.groupby(pd.Grouper(freq=length)).mean()
    timesteps = []
    for _ in range(n):
        if how == "max":
            ts = group.idxmax()
        elif how == "min":
            ts = group.idxmin()
        timesteps.append(ts)
        group = group.drop(ts)

    # Get range of timestamps including padding
    full_timesteps = []
    for ts in timesteps:
        ts_end = ts + pd.Timedelta(length)
        if padding is not None:
            ts -= pd.Timedelta(padding)
            ts_end += pd.Timedelta(padding)
        ts_range = series[ts:ts_end].index[:-1]
        full_timesteps.append(ts_range)

    ts_index = _concat_indices(full_timesteps)

    return ts_index


[docs]def extreme( data, tech, var="resource", how="max", length="1D", n=1, groupby_length=None, padding=None, normalize=True, **kwargs, ): """ Returns timesteps for period of ``length`` where ``var`` for the technology ``tech`` across the given list of ``locations`` is either minimal or maximal. Parameters ---------- data : xarray.Dataset tech : str Technology whose `var` to find extreme for. var : str, optional default 'resource' how : str, optional 'max' (default) or 'min'. length : str, optional Defaults to '1D'. n : int, optional Number of periods of `length` to look for, default is 1. groupby_length : str, optional Group time series and return `n` periods of `length` for each group. padding : str, optional Either Pandas frequency (e.g. '1D') or 'calendar_week'. If Pandas frequency, symmetric padding is undertaken, either side of `length` If 'calendar_week', padding is fit to the calendar week in which the extreme day(s) are found. normalize : bool, optional If True (default), data is normalized using :func:`~calliope.time.funcs.normalized_copy`. kwargs : dict, optional Dimensions of the selected var over which to index. Any remaining dimensions will be flattened by mean """ if normalize: # Only normalise the desired var as rest of data may contain # non-numeric variables! data_n = funcs.normalized_copy(data[var].to_dataset(name=var)) else: data_n = data arr = _get_array(data_n, var, tech, **kwargs) return _extreme_with_padding(arr, how, length, n, groupby_length, padding)
[docs]def extreme_diff( data, tech0, tech1, var="resource", how="max", length="1D", n=1, groupby_length=None, padding=None, normalize=True, **kwargs, ): """ Returns timesteps for period of ``length`` where the diffence in extreme value for ``var`` between technologies ``tech0`` and ``tech1`` is either a minimum or a maximum. Parameters ---------- data : xarray.Dataset tech0 : str First technology for which we find the extreme of `var` tech1 : str Second technology for which we find the extreme of `var` var : str, optional default 'resource' how : str, optional 'max' (default) or 'min'. length : str, optional Defaults to '1D'. n : int, optional Number of periods of `length` to look for, default is 1. groupby_length : str, optional Group time series and return `n` periods of `length` for each group. padding : str, optional Either Pandas frequency (e.g. '1D') or 'calendar_week'. If Pandas frequency, symmetric padding is undertaken, either side of `length` If 'calendar_week', padding is fit to the calendar week in which the extreme day(s) are found. normalize : bool, optional If True (default), data is normalized using :func:`~calliope.time.funcs.normalized_copy`. kwargs : dict, optional Dimensions of the selected var over which to index. Any remaining dimensions will be flattened by mean """ if normalize: # Only normalise the desired var as rest of data may contain # non-numeric variables! data_n = funcs.normalized_copy(data[var].to_dataset(name=var)) else: data_n = data arr0 = _get_array(data_n, var, tech0, **kwargs) arr1 = _get_array(data_n, var, tech1, **kwargs) arr = arr0 - arr1 return _extreme_with_padding(arr, how, length, n, groupby_length, padding)
def _extreme(arr, how="max", length="1D", n=1, groupby_length=None, padding=None): if groupby_length: groupby = pd.Grouper(freq=groupby_length) group_indices = [] grouping = arr.groupby(groupby) for k in grouping.groups.keys(): s = grouping.get_group(k) group_indices.append(_get_minmax_timestamps(s, length, n, how, padding)) ts_index = _concat_indices(group_indices) else: ts_index = _get_minmax_timestamps(arr, length, n, how, padding) return ts_index def _extreme_with_padding(arr, how, length, n, groupby_length, padding): if padding == "calendar_week": if n != 1 or length != "1D": raise ValueError( "calendar_week padding only supports n=1 and length=1D for now." ) result = _extreme(arr, how, length, n, groupby_length, padding=None) # get week padding for each day in result days = list(result.groupby(result.dayofyear).values()) weeks = pd.DatetimeIndex(days[0]) for d in days: weeks = weeks.union(_calendar_week_padding(d, arr)) # concatenate the weeks into one index and drop possible duplicates return pd.DatetimeIndex(weeks).drop_duplicates() else: return _extreme(arr, how, length, n, groupby_length, padding) def _calendar_week_padding(day, arr): """ Given a day, returns the whole calendar week which contains that day """ days = len(day.day.unique()) if not days == 1: raise ValueError( "Only a single day at a time may be used for calendar_week padding, " "but {} days were passed.".format(days) ) # Using day of week, figure out how many days before and after to get # a complete week days_before = 6 - day[0].dayofweek days_after = 6 - days_before # Turn it into a week start_time = day[0] - pd.Timedelta("{}D".format(days_before)) end_time = day[-1] + pd.Timedelta("{}D".format(days_after)) before = arr[start_time : day[0]].index[:-1] after = arr[day[-1] : end_time].index[1:] result_week = before.append(day).append(after) return result_week