Source code for calliope.parallel
"""
Copyright (C) 2013-2017 Calliope contributors listed in AUTHORS.
Licensed under the Apache 2.0 License (see LICENSE file).
parallel.py
~~~~~~~~~~~
Defines the Parallelizer class which manages the creation of arbitrary
numbers of scripted runs, either locally or to be deployed to a cluster.
"""
import copy
import itertools
import os
import numpy as np
import pandas as pd
from . import core
from . import utils
[docs]class Parallelizer(object):
"""Arguments:
* ``target_dir``: path to output directory for parallel runs.
* ``config_run``: path to YAML file with run settings. If not
given, the included example run.yaml is used.
"""
def __init__(self, target_dir, config_run=None):
super(Parallelizer, self).__init__()
if not config_run:
config_run = os.path.join(os.path.dirname(__file__),
'example_models', 'national_scale', 'run.yaml')
self.config_file = config_run
self.config = utils.AttrDict.from_yaml(config_run)
self.target_dir = target_dir
self.f_submit = 'submit_{}.sh'
self.f_run = 'run.sh'
def generate_iterations(self):
# Get each iteration config as a dict with flat (x.y.z-style) keys
c = [d.as_dict(flat=True) for d in self.config.parallel.iterations]
if isinstance(c, list):
df = pd.DataFrame(c)
elif isinstance(c, dict):
iter_keys = c.keys_nested()
iter_values = [c.get_key(i) for i in iter_keys]
df = pd.DataFrame(list(itertools.product(*iter_values)),
columns=iter_keys)
df.index = list(range(1, len(df) + 1)) # 1 instead of 0-indexing
return df
def _write_modelcommands(self, f, settings):
"""
Write model execution commands for the settings file ``settings``
to file ``f``
"""
pth = os.path.join('Runs', settings)
f.write('calliope run --debug {}'.format(pth))
def _write_submit(self, f, n_iter, config=None):
"""
Write submit script to file ``f``
"""
if config is None:
c = self.config
else:
c = config
# Differentiate between singe and array styles
if c.parallel.style == 'single':
iter_string = '{}'.format(n_iter)
elif c.parallel.style == 'array':
iter_string = '1-{}'.format(n_iter)
# Write the script
f.write('#!/bin/sh\n')
if c.parallel.environment == 'qsub':
f.write('#$ -t {}\n'.format(iter_string))
f.write('#$ -N {}\n'.format(c.parallel.name))
f.write('#$ -j y -o Logs/run_$TASK_ID.log\n')
self._write_resources(f, config)
f.write('#$ -cwd\n')
f.write('\n./{} '.format(self.f_run) + '${SGE_TASK_ID}\n')
elif c.parallel.environment == 'bsub':
f.write('#BSUB -J {}[{}]\n'.format(c.parallel.name, iter_string))
self._write_resources(f, config)
f.write('#BSUB -o Logs/run_%I.log\n')
f.write('\n./{} '.format(self.f_run) + '${LSB_JOBINDEX}\n')
def _write_resources(self, f, config=None):
"""
Write resource requirements (memory, threads, wall time)
to file ``f``
"""
if config is None:
c = self.config
else:
c = config
if c.parallel.environment == 'qsub':
conf = c.get_key('parallel.resources.memory', default=False)
if conf:
mem_gb = conf / 1000.0 # Convert MB in settings to GB
f.write('#$ -l mem_total={:.1f}G\n'.format(mem_gb))
conf = c.get_key('parallel.resources.threads', default=False)
if conf:
try:
penv = c.get_key('parallel.parallel_env')
except KeyError:
raise KeyError('Must specify parallel_env for '
'threads >1 and qsub.')
f.write('#$ -pe {} {}\n'.format(penv, conf))
elif c.parallel.environment == 'bsub':
conf = c.get_key('parallel.resources.memory', default=False)
if conf:
f.write('#BSUB -R "rusage[mem={:.0f}]"\n'.format(conf))
conf = c.get_key('parallel.resources.threads', default=False)
if conf:
f.write('#BSUB -n {:.0f}\n'.format(conf))
conf = c.get_key('parallel.resources.wall_time', default=False)
if conf:
f.write('#BSUB -W {:.0f}\n'.format(conf))
def _write_additional_lines(self, f, lines, formats=None):
"""Write additional lines to file ``f``"""
if not isinstance(lines, list):
lines = [lines]
if formats:
lines = [i.format(formats) for i in lines]
f.writelines([i + '\n' for i in lines])
def _get_iteration_config(self, config, index_str, iter_row):
iter_c = config.copy() # iter_c is this iteration's config
# `iteration_override` is a pandas series (dataframe row)
# Build up an AttrDict with the specified overrides
override_c = utils.AttrDict()
for k, v in iter_row.to_dict().items():
# NaN values can show in this row if some but not all iterations
# specify a value, so we simply skip them
if not isinstance(v, list) and pd.isnull(v):
# NB the isinstance and pd.isnull checks should cover all cases
# i.e. both not a list (which is definitely not null) or a
# single value that could be null. But this could blow up in
# unexpected edge cases...
continue
# Convert numpy dtypes to python ones, else YAML chokes
if isinstance(v, np.generic):
v = np.asscalar(v)
if isinstance(v, dict):
override_c.set_key(k, utils.AttrDict(v))
else:
override_c.set_key(k, copy.copy(v))
# Finally, add the override AttrDict to the existing configuration
iter_c.union(override_c, allow_override=True, allow_replacement=True)
# Set output dir in configuration object, this is hardcoded
iter_c.set_key('output.path', os.path.join('Output', index_str))
return iter_c
def generate_runs(self):
c = self.config
# Create output directory
if self.target_dir:
out_dir = os.path.join(self.target_dir, c.parallel.name)
else:
out_dir = c.parallel.name
os.makedirs(out_dir)
os.makedirs(os.path.join(out_dir, 'Runs'))
os.makedirs(os.path.join(out_dir, 'Logs'))
iterations = self.generate_iterations()
# Save relevant settings in out_dir,
# to figure out what all the numbers mean!
os.makedirs(os.path.join(out_dir, 'Output'))
iterations.to_csv(os.path.join(out_dir, 'Output', 'iterations.csv'))
parallel_f = os.path.join(out_dir, 'Output', 'parallel_settings.yaml')
c.parallel.to_yaml(parallel_f)
# Decide whether to generate a single or multiple submission files
# If different iterations ask for different resources, multiple
# files are necessary
c.set_key('parallel.style', 'array') # Try setting 'array' as default
# Flatten list of iteration keys, check if parallel.resources defined
# anywhere in the list, if so, replace 'array' with 'single', since
# we need each iteration's run script to be able to set its resources
iteration_keys = [d.keys_nested() for d in c.parallel.iterations]
for i in itertools.chain.from_iterable(iteration_keys):
if 'parallel.resources' in i:
c.set_key('parallel.style', 'single')
break
#
# COMBINE ALL MODEL CONFIG INTO ONE FILE AND WRITE IT TO `out_dir`
#
data_path_adj = c.get_key('parallel.data_path_adjustment', None)
o = core.get_model_config(c, self.config_file,
adjust_data_path=data_path_adj,
insert_defaults=False)
unified_config_file = os.path.join(out_dir, 'Runs', 'model.yaml')
o.to_yaml(os.path.join(unified_config_file))
c.model = 'model.yaml'
# Always make sure we are saving outputs from iterations!
c.set_key('output.save', True)
#
# SET UP SUBMISSION SCRIPT FOR ARRAY RUNS
#
if c.parallel.style == 'array':
# Write array submission script
submit_file = os.path.join(out_dir, self.f_submit.format('array'))
with open(submit_file, 'w') as f:
self._write_submit(f, n_iter=len(iterations))
os.chmod(submit_file, 0o755)
#
# SET UP RUN SCRIPT AND SUBMISSION SCRIPTS FOR SINGLE RUNS
#
run_file = os.path.join(out_dir, self.f_run)
with open(run_file, 'w') as f:
f.write('#!/bin/sh\n')
f.write('case "$1" in\n\n')
for iter_id, iter_row in iterations.iterrows():
index_str = '{:0>4d}'.format(iter_id)
iter_c = self._get_iteration_config(c, index_str, iter_row)
settings_file = 'settings_{}.yaml'.format(index_str)
# Write run script entry
with open(os.path.join(out_dir, self.f_run), 'a') as f:
f.write('{}) '.format(iter_id))
if c.get_key('parallel.pre_run', default=False):
self._write_additional_lines(f, c.parallel.pre_run)
self._write_modelcommands(f, settings_file)
if c.get_key('parallel.post_run', default=False):
self._write_additional_lines(f, c.parallel.post_run,
formats={'id': iter_id})
f.write(';;\n\n')
# If style is single, also write a single submission script
if c.parallel.style == 'single':
# Write model run script
submit_file = os.path.join(out_dir,
self.f_submit.format(index_str))
with open(submit_file, 'w') as f:
self._write_submit(f, n_iter=iter_id, config=iter_c)
os.chmod(submit_file, 0o755)
# Write configuration object to YAML file
del iter_c.parallel # parallel settings not needed in each file
iter_c.to_yaml(os.path.join(out_dir, 'Runs', settings_file))
# Final tasks after going through all iterations
with open(run_file, 'a') as f:
f.write('esac\n')
os.chmod(run_file, 0o755)