Source code for calliope.time_funcs

"""
Copyright (C) 2013-2017 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).

time_funcs.py
~~~~~~~~~~~~~

Functions to process time series data.

"""

import logging

import pandas as pd
import xarray as xr
from xarray.ufuncs import fabs  # pylint: disable=no-name-in-module

from . import utils
from . import time_clustering


[docs]def normalized_copy(data): """ Return a copy of data, with the absolute taken and normalized to 0-1. The maximum across all regions and timesteps is used to normalize. """ ds = data.copy(deep=True) # Work off a copy data_vars_in_t = [v for v in time_clustering._get_datavars(data) if 't' in data[v].dims] for var in data_vars_in_t: for y in ds.coords['y'].values: # Get max across all regions to normalize against norm_max = fabs(ds[var].loc[{'y': y}]).max() for x in ds.coords['x'].values: df = ds[var].loc[{'x': x, 'y': y}] ds[var].loc[{'x': x, 'y': y}] = fabs(df) / norm_max return ds
def _copy_non_t_vars(data0, data1): """Copies non-t-indexed variables from data0 into data1, then returns data1""" non_t_vars = [v for v in data0.data_vars if 't' not in data0[v].dims] # Manually copy over variables not in `t`. If we don't do this, # these vars get polluted with a superfluous `t` dimension for v in non_t_vars: data1[v] = data0[v] return data1 def _combine_datasets(data0, data1): """Concatenates data0 and data1 along the t dimension""" data_new = xr.concat([data0, data1], dim='t') # Ensure time dimension is ordered data_new = data_new.loc[{'t': data_new.t.to_pandas().index.sort_values()}] return data_new
[docs]def apply_clustering(data, timesteps, clustering_func, how, normalize=True, **kwargs): """ Apply the given clustering function to the given data. Parameters ---------- data : xarray.Dataset timesteps : pandas.DatetimeIndex or list of timesteps or None clustering_func : str Name of clustering function. how : str How to map clusters to data. 'mean' or 'closest'. normalize : bool, optional If True (default), data is normalized before clustering is applied, using :func:`~calliope.time_funcs.normalized_copy`. **kwargs : optional Arguments passed to clustering_func. Returns ------- data_new_scaled : xarray.Dataset """ # Only apply clustering function on subset of masked timesteps if timesteps is None: data_to_cluster = data else: data_to_cluster = data.loc[{'t': timesteps}] if normalize: data_normalized = normalized_copy(data_to_cluster) else: data_normalized = data_to_cluster # Get function from `clustering_func` string func = utils.plugin_load(clustering_func, builtin_module='time_clustering') result = func(data_normalized, **kwargs) clusters = result[0] # Ignore other stuff returned data_new = time_clustering.map_clusters_to_data(data_to_cluster, clusters, how=how) if timesteps is None: data_new = _copy_non_t_vars(data, data_new) else: # Drop timesteps from old data data_new = _copy_non_t_vars(data, data_new) data_new = _combine_datasets(data.drop(timesteps, dim='t'), data_new) data_new = _copy_non_t_vars(data, data_new) # Scale the new/combined data so that the mean for each (x, y, variable) # combination matches that from the original data data_new_scaled = data_new.copy(deep=True) data_vars_in_t = [v for v in time_clustering._get_datavars(data) if 't' in data[v].dims] for var in data_vars_in_t: scale_to_match_mean = (data[var].mean(dim='t') / data_new[var].mean(dim='t')).fillna(0) data_new_scaled[var] = data_new[var] * scale_to_match_mean return data_new_scaled
_RESAMPLE_METHODS = { '_weights': 'mean', '_time_res': 'sum', 'r': 'sum', 'e_eff': 'mean', } def resample(data, timesteps, resolution): data_new = data.copy(deep=True) if timesteps is not None: data_new = data_new.loc[{'t': timesteps}] # First create a new resampled dataset of the correct size by # using first-resample, which should be a quick way to achieve this data_rs = data_new.resample(resolution, dim='t', how='first') timestep_vars = [v for v in data_new.data_vars if 't' in data_new[v].dims] # Resampling adds spurious `t` dimension to non-t vars, correct that for v in data_rs.data_vars: if v not in timestep_vars: data_rs[v] = data[v] for var in timestep_vars: if var in _RESAMPLE_METHODS: how = _RESAMPLE_METHODS[var] data_rs[var] = data_new[var].resample(resolution, dim='t', how=how) else: # If we don't know how to resample a var, we drop it logging.error('Dropping {} because it has no resampling method.'.format(var)) data_rs = data_rs.drop(var) # Get rid of the filled-in NaN timestamps data_rs = data_rs.dropna(dim='t', how='all') data_rs.attrs['opmode_safe'] = True # Resampling still permits operational mode if timesteps is not None: # Combine leftover parts of passed in data with new data data_rs = _copy_non_t_vars(data, data_rs) data_rs = _combine_datasets(data.drop(timesteps, dim='t'), data_rs) data_rs = _copy_non_t_vars(data, data_rs) # Having timesteps with different lengths does not permit operational mode data_rs.attrs['opmode_safe'] = False return data_rs
[docs]def drop(data, timesteps, padding=None): """ Drop timesteps from data, with optional padding around into the contiguous areas encompassed by the timesteps. """ if padding: ts_per_day = time_clustering._get_timesteps_per_day(data) freq = '{}H'.format(24 / ts_per_day) # Series of 1 where timesteps 'exist' and 0 where they don't s = (pd.Series(1, index=timesteps) .reindex(pd.date_range(timesteps[0], timesteps[-1], freq=freq)) .fillna(0)) # Blocks of contiguous 1's in the series blocks = (s != s.shift()).cumsum().drop(s[s==0].index) # Groups of contiguous areas groups = blocks.groupby(blocks).apply(lambda x: (x.index[0], x.index[-1])) # Reduce size of each block by `padding` on both sides padding = pd.Timedelta(padding) dt_indices = [pd.date_range(g[0] + padding, g[1] - padding, freq=freq) for g in groups] # Concatenate the DatetimeIndexes by using dummy Series timesteps = pd.concat([pd.Series(0, index=i) for i in dt_indices]).index # 'Distribute weight' of the dropped timesteps onto the remaining ones dropped_weight = data._weights.loc[{'t': timesteps}].sum() data = data.drop(timesteps, dim='t') data['_weights'] = data['_weights'] + (dropped_weight / len(data['_weights'])) return data