Source code for calliope.read

"""
Copyright (C) 2013-2017 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).

read.py
~~~~~~~

Functions to read saved model results.

"""

import glob
import logging
import os

import pandas as pd
import xarray as xr

from .utils import AttrDict


REQUIRED_TABLES = ['capacity_factor', 'levelized_cost',
                   'metadata', 'groups', 'shares', 'summary',
                   'time_res']


def _check(path, solution):
    # Superficial check if some key tables are missing
    missing_keys = set(REQUIRED_TABLES) - set(solution.data_vars)
    if len(missing_keys) > 0:
        logging.warning('Solution {} missing tables: '
                        '{}'.format(path, missing_keys))


[docs]def read_netcdf(path): """Read model solution from NetCDF4 file""" with xr.open_dataset(path) as solution: solution.load() # Deserialize YAML attributes for k in ['config_model', 'config_run']: solution.attrs[k] = AttrDict.from_yaml_string(solution.attrs[k]) _check(path, solution) return solution
def read_csv(directory): solution = AttrDict() tables_to_read = glob.glob(directory + '/*.csv') if len(tables_to_read) == 0: raise IOError('No CSV files found') # Only keep basenames without extension tables_to_read = [os.path.splitext(os.path.basename(f))[0] for f in tables_to_read] arrays = {} for f in tables_to_read: src = os.path.join(directory, f + '.csv') cols = pd.read_csv(src, nrows=1).columns series = pd.read_csv(src, index_col=list(range(len(cols) - 1)), parse_dates=True, squeeze=True) # Make everything except the last column a MultiIndex arrays[f] = xr.DataArray.from_series(series) solution = xr.Dataset(arrays) # Restore metadata from YAML md = AttrDict.from_yaml(os.path.join(directory, 'metadata.yaml')) for k in md.keys(): solution.attrs[k] = md[k] _check(directory, solution) return solution def _detect_format(directory): """Detects format, falling back to CSV if it can't find NetCDF4""" if os.path.exists(os.path.join(directory, 'solution.nc')): return 'netcdf' else: return 'csv'
[docs]def read_dir(directory): """Combines output files from `directory` and return an AttrDict containing them all. If a solution is missing or there is an error reading it, an empty AttrDict is added to the results in its stead and the error is logged. """ results = AttrDict() results.iterations = pd.read_csv(os.path.join(directory, 'iterations.csv'), index_col=0) results.solutions = AttrDict() for i in results.iterations.index.tolist(): iteration_dir = os.path.join(directory, '{:0>4d}'.format(i)) fmt = _detect_format(iteration_dir) logging.debug('Iteration: {}, Format detected: {}'.format(i, fmt)) try: if fmt == 'netcdf': sol_path = os.path.join(iteration_dir, 'solution.nc') results.solutions[i] = read_netcdf(sol_path) else: sol_path = iteration_dir results.solutions[i] = read_csv(sol_path) logging.debug('Read as {}: {}'.format(fmt, sol_path)) except IOError as err: logging.warning('I/O error in `{}` at iteration `{}`' ': {}'.format(iteration_dir, i, err)) # results.solutions[i] = AttrDict() # add an empty entry continue return results
## # Functionality to post-process parallel runs into aggregated NetCDF files ## def union_of_indexes(indexes): idx = indexes[0] for i in range(len(indexes) - 1): idx = idx.union(indexes[i + 1]) return idx def get_overarching_time_index(datasets): all_time_dims = [v.coords['t'].to_index() for k, v in datasets.items()] return union_of_indexes(all_time_dims) def get_longest_time_index_length(datasets): all_time_dims = [len(v.coords['t'].to_index()) for k, v in datasets.items()] return max(all_time_dims) def results_to_dataset(results, run_name, reset_time_index=False): # Add 'run' dimension for k in results.solutions: results.solutions[k].coords['run'] = k results.solutions[k]['run'] = k # Reindex all datasets with reset list of integeres or # union of all time indices # In the first case, all loaded datasets should have roughly the # same length of dimension t (e.g. multiple runs across a single year) if reset_time_index: max_len_idx = list(range(get_longest_time_index_length(results.solutions))) for k in results.solutions: results.solutions[k]['t'] = list(range(len(results.solutions[k]['t']))) results.solutions[k] = results.solutions[k].reindex(dict(t=max_len_idx)) else: new_idx = get_overarching_time_index(results.solutions) for k in results.solutions: results.solutions[k] = results.solutions[k].reindex(dict(t=new_idx)) ds_results = xr.concat(results.solutions.values(), dim='run') # Remove metadata for k in list(ds_results.attrs.keys()): if k != 'calliope_version': del ds_results.attrs[k] results.iterations.index.name = 'run' results.iterations.columns.name = 'cols_iterations' ds_results['iterations'] = results.iterations ds_results['iterations'] = ds_results.iterations.astype(str) # Force to str # Add run name dimension ds_results['run_name'] = xr.DataArray([run_name], coords={'run_name': [run_name]}) ds_results = ds_results.set_coords('run_name') return ds_results def dir_to_dataset(in_dir, run_name, reset_time_index=False): results = read_dir(in_dir) return results_to_dataset(results, run_name, reset_time_index) def convert_run_dir_to_netcdf(in_dir, out_file, reset_time_index=False): this_dir = os.path.join(in_dir, 'Output') run_name = this_dir.split('/')[-1] ds = dir_to_dataset(this_dir, run_name, reset_time_index) encoding = {k: {'zlib': True, 'complevel': 4} for k in ds.data_vars} ds.to_netcdf(out_file, format='netCDF4', encoding=encoding) ds.close() # Force-close NetCDF file after writing def convert_subdirs_to_netcdfs(in_dir, out_dir, reset_time_index_for_subdirs=None): if reset_time_index_for_subdirs is None: reset_time_index_for_subdirs = [] subdirs = [ i for i in os.listdir(in_dir) if os.path.isdir(os.path.join(in_dir, i)) and not i.startswith('.') ] os.makedirs(out_dir, exist_ok=True) for s in subdirs: if s in reset_time_index_for_subdirs: reset_time_index = True else: reset_time_index = False this_path = os.path.join(in_dir, s) out_file = os.path.join(out_dir, s + '.nc') if os.path.exists(out_file): print('File exists, skipping: {}'.format(out_file)) else: print('Processing {}'.format(this_path)) convert_run_dir_to_netcdf(this_path, out_file, reset_time_index) def combine_subdir_netcdfs(in_dir, out_file): in_files = glob.glob(os.path.join(in_dir, '*.nc')) datasets = [xr.open_dataset(i) for i in in_files] t_idx = get_overarching_time_index(datasets) for i in range(len(datasets)): datasets[i] = datasets[i].reindex(dict(t=t_idx)) ds = xr.concat(datasets, dim='run_name') encoding = {k: {'zlib': True, 'complevel': 4} for k in ds.data_vars} ds.to_netcdf(out_file, format='netCDF4', encoding=encoding) ds.close() # Force-close NetCDF file after writing for d in datasets: d.close()