Source code for viperleed.calc.classes.rparams.rparams

"""Module rparams of viperleed.calc.classes.rparams.

This is the module defining the core class of this package, i.e.,
Rparams. The class contains parameters read from the PARAMETERS
file, and some parameters defined at runtime. The attributes that
represent not-so-obvious user parameters are instances of classes
defined as part of the special sub-package of rparams. Refactored
on 2023-10-23 by @michele-riva.
"""

__authors__ = (
    'Florian Kraushofer (@fkraushofer)',
    'Michele Riva (@michele-riva)',
    )
__copyright__ = 'Copyright (c) 2019-2024 ViPErLEED developers'
__created__ = '2019-06-13'
__license__ = 'GPLv3+'

from collections import defaultdict
import copy
import logging
import os
from pathlib import Path
import random
import shutil
from timeit import default_timer as timer

try:
    import matplotlib.pyplot as plt
except ImportError:
    _CAN_PLOT = False
else:
    _CAN_PLOT = True

import numpy as np

from viperleed.calc.classes.searchpar import SearchPar
from viperleed.calc.files import beams as iobeams
from viperleed.calc.files.iodeltas import checkDelta
from viperleed.calc.lib import leedbase
from viperleed.calc.lib.base import available_cpu_count
from viperleed.calc.lib.base import parent_name
from viperleed.calc.lib.checksums import KNOWN_TL_VERSIONS
from viperleed.calc.lib.checksums import UnknownTensErLEEDVersionError
from viperleed.calc.sections.calc_section import EXPBEAMS_NAMES
from viperleed.calc.lib.version import Version
from viperleed.calc.files.tenserleed import get_tenserleed_sources

from .defaults import DEFAULTS, NO_VALUE, TENSERLEED_FOLDER_NAME
from .limits import PARAM_LIMITS
from .special.base import NotASpecialParameterError
from .special.base import SpecialParameter


_LOGGER = logging.getLogger(parent_name(__name__))
if _CAN_PLOT:
    plt.style.use('viperleed.calc')


[docs]class Rparams: """Stores the parameters found in a PARAMETERS file (default values in __init__), as well as some parameters defined at runtime."""
[docs] def __init__(self): self.readParams = defaultdict(list) # Raw PARAMETERS from file # FROM PARAMETERS FILE self.ATTENUATION_EPS = 0.001 self.AVERAGE_BEAMS = None self.BULKDOUBLING_EPS = 0.001 self.BULKDOUBLING_MAX = 10 self.BULK_LIKE_BELOW = 0. self.BULK_REPEAT = None self.DOMAINS = {} # {name: path_to_tensors_zip_or_dir} self.DOMAIN_STEP = 1 # area step in percent for domain search self.ELEMENT_MIX = {} # {element_name: splitlist} self.ELEMENT_RENAME = {} # {element_name: chemical_element} self.FILAMENT_WF = DEFAULTS['FILAMENT_WF']['lab6'] # work function of emitting cathode self.FORTRAN_COMP = ['', ''] # before files, after files self.FORTRAN_COMP_MPI = ['', ''] # before files, after files self.GAUSSIAN_WIDTH = DEFAULTS['GAUSSIAN_WIDTH'] self.GAUSSIAN_WIDTH_SCALING = DEFAULTS['GAUSSIAN_WIDTH_SCALING'] self.HALTING = 2 # 2: major concerns, 1: minor warnings, 0: always self.INTPOL_DEG = 3 # Degree of interpolation spline used in R-factor calculation self.IV_SHIFT_RANGE = self.get_default('IV_SHIFT_RANGE') self.LAYER_CUTS = self.get_default('LAYER_CUTS') self.LAYER_STACK_VERTICAL = True self.LMAX = self.get_default('LMAX') self.LOG_LEVEL = DEFAULTS['LOG_LEVEL'][NO_VALUE] self.LOG_SEARCH = True self.N_BULK_LAYERS = 1 # number of bulk layers self.N_CORES = 0 # number of cores self.OPTIMIZE = { # settings for fd optimization 'which': 'none', 'step': 0., 'minpoints': 4, 'maxpoints': 10, 'convergence': 0., 'maxstep': 0. } self.PARABOLA_FIT = {'type': 'none', 'alpha': 1e-2, 'mincurv': 5e-3, 'localize': 0} self.PHASESHIFT_EPS = DEFAULTS['PHASESHIFT_EPS']['d'] # changed in updateDerivedParams self.PHASESHIFTS_CALC_OLD = True # use old EEASiSSS version # TODO: once established, set to False or remove self.PHASESHIFTS_OUT_OLD = True # output old PHASESHIFTS file # TODO: once established, set to False or remove self.PHI = DEFAULTS['PHI'] # from BEAM_INCIDENCE self.PLOT_IV = {'plot': True, 'axes': 'all', 'colors': [], 'font_size': 10, 'legend': 'all', 'line_width': 1, 'overbar': False, 'perpage': 2, } self.RUN = self.get_default('RUN') # what segments should be run self.R_FACTOR_LEGACY = True # use old runtime-compiled R-factor calculation self.R_FACTOR_TYPE = 1 # 1: Pendry, 2: R2, 3: Zanazzi-Jona self.R_FACTOR_SMOOTH = 0 self.S_OVL = 0.3 # Muffin tin overlap parameter after Rundgren 2021, default is 0.3 - set or optimize in FD self.SCREEN_APERTURE = 110. self.SEARCH_BEAMS = 0 # 0: average, 1: integer, 2: fractional # SEARCH_CULL: fraction of population, or absolute nr. if >1 # SEARCH_CULL.type_: clone, genetic, random self.SEARCH_CULL = self.get_default('SEARCH_CULL') self.SEARCH_MAX_GEN = 100000 # maximum number of generations in search self.SEARCH_MAX_DGEN = self.get_default('SEARCH_MAX_DGEN') # maximum number of generations without change before search # is stopped. All: all configs, best: only 1, dec: best 10% # 0: don't use parameter self.SEARCH_MAX_DGEN_SCALING = {'all': None, 'best': None, 'dec': None} # will be defined in updateDerivedParams self.SEARCH_LOOP = False self.SEARCH_POPULATION = 0 # trial structures in search self.SEARCH_START = 'crandom' self.SITE_DEF = {} # {element_name: {sitename, [atom.num]}} self.STOP = False self.SUPERLATTICE = np.identity(2, dtype=float) self.SUPPRESS_EXECUTION = False self.SYMMETRIZE_INPUT = True self.SYMMETRY_CELL_TRANSFORM = np.identity(2, dtype=float) self.SYMMETRY_EPS = self.get_default('SYMMETRY_EPS') self.SYMMETRY_FIND_ORI = True self.SYMMETRY_FIX = self.get_default('SYMMETRY_FIX') self.SYMMETRY_BULK = {} # keys: group, rotation, mirror self.TENSOR_INDEX = None # default: pick highest in Tensors folder self.TENSOR_OUTPUT = [] # per layer: write Tensor output? (0/1) # THEO_ENERGIES: the default values without experimental # beams is set in section INIT via self.initTheoEnergies self.THEO_ENERGIES = self.get_default('THEO_ENERGIES') self.THETA = DEFAULTS['THETA'] # from BEAM_INCIDENCE self.TL_IGNORE_CHECKSUM = False self.TL_VERSION = self.get_default('TL_VERSION') self.T_EXPERIMENT = None self.T_DEBYE = None self.V0_IMAG = 4.5 self.V0_REAL = 'default' # 'default' will read from PHASESHIFTS self.V0_Z_ONSET = 1.0 self.VIBR_AMP_SCALE = [] # read as list of strings, interpret later self.ZIP_COMPRESSION_LEVEL = DEFAULTS['ZIP_COMPRESSION_LEVEL'] # RUN VARIABLES self.starttime = timer() self.source_dir = None # where to find 'tensorleed' self.workdir = Path(os.getcwd()) # MAIN WORK DIRECTORY; where to find input self.compile_logs_dir = None self.searchConvInit = { 'gaussian': None, 'dgen': {'all': None, 'best': None, 'dec': None}} self.searchEvalTime = DEFAULTS['SEARCH_EVAL_TIME'] # time interval for reading SD.TL self.output_interval = None # changed in updateDerivedParams self.searchMaxGenInit = self.SEARCH_MAX_GEN self.searchStartInit = None # script progress tracking self.halt = 0 self.systemName = '' self.timestamp = '' self.manifest = ['SUPP', 'OUT'] self.fileLoaded = { 'PARAMETERS': True, 'POSCAR': False, 'IVBEAMS': False, 'VIBROCC': False, 'PHASESHIFTS': False, 'DISPLACEMENTS': False, 'BEAMLIST': False, 'EXPBEAMS': False } self.runHistory = [] # sections that have been executed before self.lastOldruns = [] # copy of runHistory when last oldruns folder was created self.superlattice_defined = False self.ivbeams_sorted = False self.last_R = None self.stored_R = {'refcalc': None, 'superpos': None} self.checklist = [] # output strings of things to check at program end # domains self.domainParams = [] self.pseudoSlab = None # data from files self.beamlist = [] # lines as strings from BEAMLIST self.ivbeams = [] # uses Beam class; list of beams only self.expbeams = [] # uses Beam class; contains intensities self.theobeams = { # uses Beam class; contains intensities 'refcalc': None, 'superpos': None } self.expbeams_file_name = '' # EXPBEAMS or EXPBEAMS.csv? self.phaseshifts = [] self.phaseshifts_firstline = '' # contains parameters for MUFTIN self.refcalc_fdout = '' self.superpos_specout = '' self.best_v0r = None # best value for v0r from previous R-factor self.disp_blocks = [] # tuples (lines, name) in DISPLACEMENTS file self.disp_block_read = False # current displacements block read? self.disp_loops = [] # list of tuples (loopStart, loopEnd) self.controlChemBackup = None # search parameters self.searchpars = [] self.searchResultConfig = None self.search_atlist = [] # atoms that are relevant for the search self.search_maxfiles = 0 # maximum number of delta files for one atom self.search_maxconc = 1 # maximum number of concentration steps self.indyPars = 0 # number of independent parameters self.mncstep = 0 # max. steps (geo. times therm.) for one atom self.search_index = 0 # which DISPLACEMENTS block is being done # plotting data self.rfacscatter_all = [] # tuples (gen, r, size, color) self.rfacscatter = [] # same, but thinned out along gens self.parScatter = [[]] # tuples (gen, mean scatter, max scatter) per search self.searchplots = [('', [], [], [], [])] # (name, gens, min, max, mean) for each search self.lastParScatterFigs = {}
# complete figures for each search, with search names as keys @property def is_debug_mode(self): return self.LOG_LEVEL < logging.INFO @property def no_value(self): return NO_VALUE def try_loading_expbeams_file(self): """Read an EXPBEAMS file if not already available.""" if self.fileLoaded['EXPBEAMS']: return exp_files_provided = (f for f in EXPBEAMS_NAMES if Path(f).is_file()) try: self.expbeams_file_name = next(exp_files_provided) except StopIteration: # Nothing to load self.expbeams_file_name = '' return # Warn if multiple experimental input files were provided if next(exp_files_provided, False): self.setHaltingLevel(1) _LOGGER.warning( 'Multiple files with experimental I(V) curves were provided. ' 'Check if the root directory contains the correct files. ' f'Using file {self.expbeams_file_name!r}.' ) err_msg = f'Error while reading file {self.expbeams_file_name}' enrange = self.THEO_ENERGIES.as_floats()[:2] try: self.expbeams = iobeams.readOUTBEAMS(self.expbeams_file_name, enrange=enrange) except OSError: _LOGGER.error(f'{err_msg}.', exc_info=True) return if self.expbeams: self.fileLoaded['EXPBEAMS'] = True else: _LOGGER.error(f'{err_msg}: No data was read.') @classmethod def get_default(cls, param): """Return the default value of param.""" try: value = DEFAULTS[param] except KeyError as err: raise ValueError(f'No default found for parameter {param}') from err if isinstance(value, tuple): value = list(value) elif isinstance(value, dict): value = copy.deepcopy(value) # See if param is a special one return cls._to_simple_or_special_param(param, value) @staticmethod def _to_simple_or_special_param(param, value): """Return the value of a standard or special parameter.""" try: special = SpecialParameter.get_subclass(param) except NotASpecialParameterError: return value return special.from_value(value) def reset_default(self, param): """Reset param to its default value.""" default = self.get_default(param) setattr(self, param, default) @staticmethod def get_limits(param): """Return the smallest and largest acceptable values of param.""" try: return PARAM_LIMITS[param] except KeyError as err: raise ValueError(f'No limits found for parameter {param}') from err def update(self, presets): """Load presets into this Rparams object.""" for param_name, param_value in presets.items(): param_name = param_name.upper() getattr(self, param_name) # Raise AttributeError if wrong value = self._to_simple_or_special_param(param_name, param_value) setattr(self, param_name, value) def total_energy_range(self): """Return the total overlapping energy range of experiment and theory. Note that this may change if experimental beams are dropped.""" if not self.expbeams: return 0. totalrange = 0. for b in self.expbeams: totalrange += (min(max(b.intens), self.THEO_ENERGIES.max) - max(min(b.intens), self.THEO_ENERGIES.min)) return totalrange def storeRfacScatter(self, x, y, s, c): """ Adds a list of points for r-factor scatter plots to self.rfacscatter Parameters ---------- x, y, s, c : lists of floats coordinates, size and color of points. Returns ------- None. """ spacing = x[-1]/50 pg = x[-1] self.rfacscatter_all.extend(list(zip(x, y, s, c))) self.rfacscatter = [] for p in self.rfacscatter_all[::-1]: if p[0] <= pg - spacing or p[0] == pg: self.rfacscatter.append(p) pg = p[0] def get_tenserleed_directory(self, wanted_version=None): # TODO: replace the default for TL_VERSION with Version('unknown') """Return the Path to a TensErLEED directory. The directory is looked up in Rparams.source_dir. Parameters ---------- version : Version, str, optional Which specific version of TensErLEED should be looked up. If not given or None, `Rparams.TL_VERSION` is used. If `version == 0`, the highest version is returned. Default is None. Returns ------- tenserleed_path : Path Path to the desired 'TensErLEED' directory or zip archive. Raises ------ RuntimeError If this method is called before `Rparams.source_dir` is set FileNotFoundError If `Rparams.source_dir` has no 'TensErLEED' subdirectories FileNotFoundError If `version` is given, but the corresponding directory was not found """ if not self.source_dir: raise RuntimeError( f'{type(self).__name__}.get_tenserleed_directory: ' 'source_dir is not set' ) source_tree = self.source_dir.resolve() wanted_version = (Version(wanted_version) if wanted_version else self.TL_VERSION) sources = get_tenserleed_sources(source_tree) # sort sources by .version attribute sources = sorted(sources, key=lambda x: x.version) if wanted_version is None: # return highest version return max(sources, key=lambda x: x.version) # otherwise look for the wanted version for source in sources: if source.version == wanted_version: return source raise FileNotFoundError( f'Could not find TensErLEED version={wanted_version} in ' f'{source_tree}.' ) def updateDerivedParams(self): """ Checks which derivative parameters (which cannot be calculated at initialization) can be calculated now Returns ------- None. """ # TENSOR_INDEX: if self.TENSOR_INDEX is None: self.TENSOR_INDEX = leedbase.getMaxTensorIndex() # TL_VERSION: if self.source_dir is None: raise RuntimeError('Cannot determine highest TensErLEED version ' 'without specifying a source directory') if self.TL_VERSION is None: # Fetch most recent TensErLEED version tl_source = self.get_tenserleed_directory() self.TL_VERSION = tl_source.version _LOGGER.debug(f'Detected TensErLEED version {str(self.TL_VERSION)}') # SEARCH_CONVERGENCE: if self.searchConvInit['gaussian'] is None: self.searchConvInit['gaussian'] = self.GAUSSIAN_WIDTH for k in ['all', 'best', 'dec']: if self.SEARCH_MAX_DGEN_SCALING[k] is None: self.SEARCH_MAX_DGEN_SCALING[k] = 1 / self.GAUSSIAN_WIDTH_SCALING if self.searchConvInit['dgen'][k] is None: self.searchConvInit['dgen'][k] = self.SEARCH_MAX_DGEN[k] if self.output_interval is None: # set output interval to SEARCH_CONVERGENCE dgen, but cap at 1000 use_dgen = min(dgen for dgen in self.searchConvInit['dgen'].values() if dgen > 0) or 1000 self.output_interval = int(min(use_dgen or 1000, 1000)) # default to 1000 if all dgen are 0 (default) if self.searchStartInit is None: self.searchStartInit = self.SEARCH_START # Phaseshifts-based: if self.fileLoaded['PHASESHIFTS']: # get highest required energy index hi = len(self.phaseshifts)-1 if self.THEO_ENERGIES.max is not NO_VALUE: for i in range(0, len(self.phaseshifts)): if (self.phaseshifts[i][0]*leedbase.HARTREE_TO_EV > self.THEO_ENERGIES.max): hi = i break # LMAX if self.PHASESHIFT_EPS == 0: self.PHASESHIFT_EPS = DEFAULTS['PHASESHIFT_EPS']['f'] min_set = self.LMAX.has_min if not min_set: self.LMAX.min = 6 if not self.LMAX.has_max: # determine value from PHASESHIFT_EPS lmax = 1 for el in self.phaseshifts[hi][1]: # only check highest energy for i, val in enumerate(el): if abs(val) > self.PHASESHIFT_EPS and (i+1) > lmax: lmax = i+1 if lmax < 8 and not min_set: _LOGGER.debug('Found small LMAX value based on ' f'PHASESHIFT_EPS parameter (LMAX={lmax})') if lmax > 18: lmax = 18 _LOGGER.info( 'The LMAX found based on the PHASESHIFT_EPS ' 'parameter is greater than 18, which is currently ' 'not supported. LMAX was set to 18.' ) self.LMAX.max = lmax else: # sanity check: are large values ignored? warn = False highval = 0 for el in self.phaseshifts[hi][1]: # highest energy for i, val in enumerate(el): if abs(val) > 0.1 and (i+1) > self.LMAX.max: warn = True highval = max(highval, abs(val)) if warn: _LOGGER.warning( 'The LMAX value defined in the PARAMETERS ' 'file leads to large phaseshift values being ignored ' f'(highest value: {highval}). Consider using a higher ' 'LMAX, or defining LMAX indirectly via PHASESHIFT_EPS.' ) self.setHaltingLevel(1) # V0_REAL if self.V0_REAL == 'default': llist = self.phaseshifts_firstline.split() c = [] try: for i in range(0, 4): c.append(float(llist[i + 1])) except ValueError: _LOGGER.error('Could not read Muftin parameters from ' 'PHASESHIFTS file.') raise self.V0_REAL = c def updateCores(self): """ If self.N_CORES is undefined, tries to find it Raises ------ RuntimeError If unable to detect number of cores. Returns ------- None """ if self.N_CORES: return try: self.N_CORES = available_cpu_count() except Exception: _LOGGER.error('Failed to detect number of cores.') raise _LOGGER.info('Automatically detected number ' f'of available CPUs: {self.N_CORES}') if not self.N_CORES: _LOGGER.error('Failed to detect number of cores.') raise RuntimeError('N_CORES undefined, automatic detection failed') def resetSearchConv(self): """ Resets the search convergence and tracking parameters to their initial values. Returns ------- None. """ self.controlChemBackup = None self.disp_block_read = False self.rfacscatter_all = [] self.searchplots.append(('', [], [], [], [])) self.parScatter.append([]) self.SEARCH_MAX_GEN = self.searchMaxGenInit if self.searchConvInit['gaussian'] is not None: self.GAUSSIAN_WIDTH = self.searchConvInit['gaussian'] if self.searchStartInit is not None: self.SEARCH_START = self.searchStartInit if self.SEARCH_START == 'control': self.SEARCH_START = 'crandom' for k in ['best', 'all', 'dec']: if self.searchConvInit['dgen'][k] is not None: self.SEARCH_MAX_DGEN[k] = self.searchConvInit['dgen'][k] def setHaltingLevel(self, set_to): """ Sets the halting level self.halt to value set_to, if that value is higher than the current halting level. Outputs debug message. Parameters ---------- set_to : int New value for self.halt Returns ------- None. """ if set_to > self.halt: _LOGGER.debug(f'Raising halting level to {set_to}') self.halt = set_to def initTheoEnergies(self): """ Initializes values for the THEO_ENERGIES parameter either to default, or if experimental beams were loaded to values corresponding to the experimental energy range. Returns ------- None. """ energies = self.THEO_ENERGIES if energies.defined: return info = self.fileLoaded['EXPBEAMS'] and not energies.has_bounds if self.fileLoaded['EXPBEAMS']: minen = min(en for beam in self.expbeams for en in beam.intens) maxen = max(en for beam in self.expbeams for en in beam.intens) values = [minen - 3, maxen + 3, 3] else: values = self.get_default('THEO_ENERGIES - no experiments') energies.set_undefined_values(values) if info: _LOGGER.debug('Initialized energy range from experimental ' f'beams file: {energies.start:.2f} to ' f'{energies.stop:.2f} eV') # TODO: eventually, these default values should be moved to some constant or other file def getFortranComp(self, comp='auto'): # TODO: combine with FortranCompMPI from below; lots of repeated code """ Checks whether ifort or gfortran are present, and sets FORTRAN_COMP accordingly. Parameters ---------- comp : str, optional 'auto' (default): will check ifort first, then gfortran. 'ifort' or 'gfortran': only that compiler will be checked. Raises ------ ValueError If the given compiler is not supported FileNotFoundError If the desired compiler was not found """ # supported compilers, in order of priority supported = ['ifort', 'gfortran'] found = '' if comp == 'auto': check = supported elif comp in supported: check = [comp] else: _LOGGER.error('Rparams.getFortranComp: requested compiler is not ' 'supported.') raise ValueError('Fortran compiler not supported') for c in check: if shutil.which(c, os.X_OK) is not None: found = c break if found == '': if comp == 'auto': _LOGGER.error('Rparams.getFortranComp: No fortran compiler ' 'found.') else: _LOGGER.error('Rparams.getFortranComp: Requested fortran ' 'compiler not found.') raise FileNotFoundError('Fortran compiler not found') if found == 'ifort': self.FORTRAN_COMP = [ 'ifort -O2 -I/opt/intel/mkl/include', '-L/opt/intel/mkl/lib/intel64 -lmkl_intel_lp64 ' '-lmkl_intel_thread -lmkl_core -liomp5 -lpthread -lm -ldl ' '-traceback'] # backtrace should not affect performance _LOGGER.debug('Using fortran compiler: ifort') elif found == 'gfortran': self.FORTRAN_COMP = ['gfortran -O2', '-llapack -lpthread -lblas ' '-fbacktrace'] # should not affect performance _LOGGER.debug('Using fortran compiler: gfortran') def getFortranMpiComp(self, comp='auto'): """ Checks whether mpiifort or mpifort are present, and sets FORTRAN_COMP mpi accordingly. Parameters ---------- comp : str, optional 'auto' (default): will check mpiifort first, then mpifort. 'mpiifort' or 'mpifort': only that compiler will be checked. Raises ------ ValueError If the given compiler is not supported FileNotFoundError If the desired compiler was not found """ # supported compilers, in order of priority supported = ['mpiifort', 'mpifort'] found = '' if comp == 'auto': check = supported elif comp in supported: check = [comp] else: _LOGGER.error('Rparams.getFortranMpiComp: requested compiler is ' 'not supported.') raise ValueError('Fortran MPI compiler not supported') for c in check: if shutil.which(c, os.X_OK) is not None: found = c break if found == '': if comp == 'auto': _LOGGER.error('Rparams.getFortranMpiComp: No fortran compiler ' 'found.') else: _LOGGER.error('Rparams.getFortranMpiComp: Requested fortran ' 'compiler not found.') raise FileNotFoundError('Fortran MPI compiler not found') if found == 'mpiifort': self.FORTRAN_COMP_MPI = ['mpiifort -Ofast', ''] _LOGGER.debug('Using fortran compiler: mpiifort') elif found == 'mpifort': self.FORTRAN_COMP_MPI = [ 'mpifort -Ofast -no-pie -fallow-argument-mismatch', '' ] _LOGGER.debug('Using fortran compiler: mpifort') def renormalizeDomainParams(self, config): """Takes a list of parameter indices as produced by e.g. getRandomConfig, and checks it for domain parameters. If the domain parameters can be multiplied by an integer value and still be in range, returns the list with multiplied values; else, returns the unchanged list.""" domain_indices = [i for (i, sp) in enumerate(self.searchpars) if sp.mode == 'dom'] if not domain_indices or all(config[i] == 1 for i in domain_indices): return config mult = 1 domain_steps = self.searchpars[domain_indices[0]].steps while (sum(config[i]-1 for i in domain_indices) * (mult + 1) <= domain_steps-1): mult += 1 for i in domain_indices: config[i] = ((config[i] - 1) * mult) + 1 return config def getCenteredConfig(self): """Returns a list of 'centered' parameter indices, i.e. all as close to 'no displacement' as possible.""" return [sp.center for sp in self.searchpars] def getPredictConfig(self, best_config=None, curv_cutoff=1e-4): """ Outputs a parameter configuration as list of integers that is as close to the result of the parabola fit as possible. For parameters with no valid parabola fit result, or curvatures below the cutoff, the value from the best known configuration will be used instead. If the best known configuration is not passed, these parameters will instead assume the value in the center of their displacement range. Parameters ---------- best_config : list of int, optional The current best configuration, to be used for parameters that do not have usable parabola fit results. If not passed, centered values will be used instead. curv_cutoff : float, optional The minimum curvature that a parameter parabola needs to have in order to use the parabola minimum. The default is 1e-4. Returns ------- list of int Parameter values for a new configuration. """ out = [] for (i, sp) in enumerate(self.searchpars): if (sp.parabolaFit['min'] is not None and not np.isnan(sp.parabolaFit['err_co']) and not np.isnan(sp.parabolaFit['err_unco'])): # sp.parabolaFit['curv'] is not None and # sp.parabolaFit['curv'] > curv_cutoff): out.append(int(round(sp.parabolaFit['min']))) else: if best_config is not None: out.append(best_config[i]) else: out.append(int((sp.steps + 1)/2)) return self.renormalizeDomainParams(out) def getRandomConfig(self): """ Outputs a new random parameter configuration. Linked parameters are guaranteed to have the same value. Returns ------- list of int Parameter values for a new configuration. """ out = [-1] * len(self.searchpars) for i, sp in enumerate(self.searchpars): out[i] = random.randint(1, sp.steps) for i, sp in enumerate(self.searchpars): if sp.linkedTo is not None: out[i] = out[self.searchpars.index(sp.linkedTo)] elif sp.restrictTo is not None: if type(sp.restrictTo) == int: out[i] = sp.restrictTo else: out[i] = out[self.searchpars.index(sp.restrictTo)] if -1 in out: _LOGGER.error(f'Rparams.getRandomConfig failed: {out}') return [] return self.renormalizeDomainParams(out) def getOffspringConfig(self, parents): """ Returns a list of parameter indices generated as a random mix of the parameters from two of the configurations passed as 'parents', picked at random if there are more than two. If possible, the parents will be picked such that they are not identical, but the offspring may nevertheless be identical with one of the parents. Parameters ---------- parents : list of list of int List of surviving configurations. Returns ------- list of int Parameter values for a new configuration. """ if not parents: _LOGGER.warning('Rparams.getOffspringConfig: Cannot create ' 'offspring configuration without parents. ' 'Returning random configuration') return self.getRandomConfig() if len(parents) == 1: _LOGGER.warning('Rparams.getOffspringConfig: Only one parent ' 'passed. Returning clone.') return parents[0] parents = parents[:] i = random.randint(0, len(parents)-1) p2 = [parents.pop(i)] while len(p2) == 1: if len(parents) == 1: p2.append(parents[0]) break i = random.randint(0, len(parents)-1) p = parents.pop(i) if p2[0] != p: p2.append(p) out = [-1] * len(self.searchpars) for i, sp in enumerate(self.searchpars): if sp.restrictTo is None and sp.linkedTo is None: out[i] = p2[random.randint(0, 1)][i] for i, sp in enumerate(self.searchpars): if sp.restrictTo is not None: if type(sp.restrictTo) == int: out[i] = sp.restrictTo else: out[i] = out[self.searchpars.index(sp.restrictTo)+1] elif sp.linkedTo is not None: out[i] = out[self.searchpars.index(sp.linkedTo)+1] if -1 in out: _LOGGER.error(f'Rparams.getOffspringConfig failed: {out}') return [] return self.renormalizeDomainParams(out) def closePdfReportFigs(self): """ Closes the pdf figures from the Search-progress pdf files, which are kept in memory for the Search-report file during a run. Returns ------- None. """ if not _CAN_PLOT: return for figures in self.lastParScatterFigs.values(): for figure in figures: try: plt.close(figure) except Exception: pass def generateSearchPars(self, sl, subdomain=False): """ Initializes a list of Searchpar objects, and assigns delta files to atoms if required. Also sets the self.indyPars parameter. Parameters ---------- sl : Slab The Slab object containing atom information. subdomain : bool, optional Set True if executing for multiple domains, and this is not the highest-level Rprarams object. Raises ------ ValueError If inconsistent values are found. RuntimeError If required files are missing. Returns ------- None. """ if self.domainParams: self.generateSearchPars_domains() return self.searchpars = [] self.search_maxfiles = 0 # maximum number of delta files for one atom self.search_maxconc = 1 self.indyPars = 0 # number of independent parameters self.mncstep = 0 # max. steps (geo. times therm.) for one atom # track which atoms are symmetry-linked to the ones already done to # not double-count indyPars eqlist = [] # get list of atoms that appear in the search if (2 in self.runHistory or 42 in self.runHistory or sl.deltas_initialized): # if delta has been run, information what deltas exist is stored atlist = [at for at in sl if not at.is_bulk and at.known_deltas] else: _LOGGER.debug('Delta-amplitudes were not calculated in current ' 'run; looking for delta files by name.') # see what delta files are present filelist = [filename for filename in os.listdir('.') if filename.startswith('DEL_')] delN = [] for filename in filelist: try: delN.append(int(filename.split('_')[1])) except ValueError: filelist.remove(filename) atlist = [at for at in sl if not at.is_bulk and at.num in delN] for at in atlist: deltaCandidates = [fn for fn in filelist if int(fn.split('_')[1]) == at.num] checkEls = list(at.disp_occ.keys()) # check for vacancy: occlists = [] for k in at.disp_occ: occlists.append(at.disp_occ[k]) for i in range(0, len(occlists[0])): totalocc = 0. for ol in occlists: if len(ol) <= i: _LOGGER.error( f'Inconsistent occupancy lists for {at}' ) raise ValueError('Inconsistent input') else: totalocc += ol[i] if totalocc < 1 - 1e-4: checkEls.append('vac') break # now check if all deltas are there: for el in checkEls: found = False for df in [f for f in deltaCandidates if f.split('_')[2] == el]: if checkDelta(df, at, el, self): found = True at.known_deltas.append(df) break if not found: _LOGGER.error('No appropriate Delta file found ' f'for {at}, element {el}') raise RuntimeError('Missing Delta file') # sanity check: are displacements defined but deltas missing? for at in sl: # check whether at has non-trivial displacements: found = False for d in [at.disp_occ, at.disp_geo, at.disp_vib]: for el in d: if len(d[el]) > 1: found = True break if not found: for el in at.disp_vib: if at.disp_vib[el][0] != 0.0: found = True break if not found: for el in at.disp_geo: if np.linalg.norm(at.disp_geo[el][0]) >= 1e-4: found = True break if not found: occlists = [] for k in at.disp_occ: occlists.append(at.disp_occ[k]) for i in range(0, len(occlists[0])): totalocc = 0. for ol in occlists: if len(ol) <= i: break # error - will pop up again later... else: totalocc += ol[i] if totalocc < 1 - 1e-4: found = True break if found and at not in atlist: _LOGGER.error(f'{at} has displacements defined, but no ' 'delta file was found! Run Delta-Amplitudes.') raise RuntimeError('Delta file not found') elif not found and at in atlist: # delta file is there, but no displacements atlist.remove(at) sl.deltas_initialized = True # sort atlist by displists al2 = [] for dl in sl.displists: al2.extend([a for a in atlist if a in dl]) al2.extend([a for a in atlist if a not in al2]) atlist = al2 md = {'geo': 1, 'vib': 2, 'occ': 3} splToRestrict = [] indep = [] for at in atlist: if len(at.known_deltas) > self.search_maxfiles: self.search_maxfiles = len(at.known_deltas) for fn in at.known_deltas: el = fn.split('_')[2] if el == 'vac': self.searchpars.append(SearchPar(at, 'geo', 'vac', fn)) continue mult = 1 pars = 0 for (mode, d) in [('vib', at.disp_vib), ('geo', at.disp_geo)]: if el in d: k = el else: k = 'all' if len(d[k]) > 1 or (len(d[k]) == 1 and ((mode == 'geo' # and np.linalg.norm(d[k][0]) >= 1e-4 ) or (mode == 'vib' and d[k][0] != 0.))): pars += 1 sp = SearchPar(at, mode, el, fn) self.searchpars.append(sp) if el in at.constraints[md[mode]]: k2 = el else: k2 = 'all' if k2 in at.constraints[md[mode]]: c = at.constraints[md[mode]][k2] if type(c) == int: sp.restrictTo = c + 1 else: splToRestrict.append((sp, c)) elif len(d[k]) > 1 and at not in eqlist: self.indyPars += 1 indep.append(sp) if len(d[k]) > 1 and at in eqlist: spl = [s for s in self.searchpars if at in s.atom.displist and s.mode == sp.mode and (s.el == el or el in ['', 'all'])] if spl: sp.linkedTo = spl[0] mult *= len(d[k]) if pars == 0: self.searchpars.append(SearchPar(at, 'geo', el, fn)) if mult > self.mncstep: self.mncstep = mult sp = SearchPar(at, 'occ', '', fn) self.searchpars.append(sp) occsteps = len(next(iter(at.disp_occ.values()))) if occsteps > 1: if occsteps > self.search_maxconc: self.search_maxconc = occsteps if at.constraints[3]: c = list(at.constraints[3].values())[0] if type(c) == int: sp.restrictTo = c + 1 else: splToRestrict.append((sp, c)) else: if at not in eqlist: # occupation will actually vary self.indyPars += 1 indep.append(sp) if at in eqlist: spl = [s for s in self.searchpars if at in s.atom.displist and s.mode == 'occ'] if spl: sp.linkedTo = spl[0] eqlist.extend(at.displist) # do not consider for future indyPars splTargets = set() for (sp, (at, el)) in splToRestrict: # ind = self.searchpars.index(sp) spl = [s for s in self.searchpars if s != sp and s.atom == at and s.mode == sp.mode and (s.el == el or el in ['', 'all'] or sp.mode == 'occ')] if spl: sp.restrictTo = spl[0] splTargets.add(spl[0]) if spl[0] not in indep and spl[0].steps > 1: self.indyPars += 1 indep.append(spl[0]) for (sp, (at, el)) in [tup for tup in splToRestrict if tup[0].restrictTo is None and tup[0] not in splTargets]: _LOGGER.warning( f'Restricting search parameter for atom {sp.atom.num}, ' f'element {sp.el}, mode {sp.mode} failed: Could not identify ' f'target search parameter (atom {at.num}, element {el}).' ) if sp not in indep and sp.steps > 1: self.indyPars += 1 indep.append(sp) for (i, sp) in enumerate(self.searchpars): # restrict to lowest number index, resolve conflicts if sp.restrictTo not in self.searchpars: continue sp2 = None while sp2 != sp.restrictTo: sp2 = sp.restrictTo ind = self.searchpars.index(sp2) if (sp2.linkedTo in self.searchpars and self.searchpars.index(sp2.linkedTo) < ind): sp.restrictTo = sp2.linkedTo elif (sp2.restrictTo in self.searchpars and self.searchpars.index(sp2.restrictTo) < ind): sp.restrictTo = sp2.restrictTo if self.searchpars.index(sp.restrictTo) >= i: if sp.restrictTo.restrictTo is None: sp.restrictTo.restrictTo = sp # invert direction if type(sp.restrictTo.restrictTo) == int: sp.restrictTo = sp.restrictTo.restrictTo else: sp.restrictTo = None # remove references to higher indices self.search_atlist = atlist if not subdomain: self.searchpars.append(SearchPar(None, 'dom', '', '')) self.searchpars[-1].steps = 2 def generateSearchPars_domains(self): """Call generateSearchPars for every domain and collate results.""" self.searchpars = [] self.indyPars = len(self.domainParams) - 1 home = os.getcwd() for dp in self.domainParams: try: os.chdir(dp.workdir) dp.rp.generateSearchPars(dp.sl, subdomain=True) except Exception: _LOGGER.error('Error while creating delta ' f'input for domain {dp.name}') raise finally: os.chdir(home) for sp in dp.rp.searchpars: if not isinstance(sp.restrictTo, int): continue sp.restrictTo += len(self.searchpars) self.searchpars.extend(dp.rp.searchpars) self.indyPars += dp.rp.indyPars for dp in self.domainParams: self.searchpars.append(SearchPar(None, 'dom', '', '')) self.searchpars[-1].steps = int(100 / self.DOMAIN_STEP) + 1 self.search_maxfiles = max(dp.rp.search_maxfiles for dp in self.domainParams) self.search_maxconc = max(dp.rp.search_maxconc for dp in self.domainParams) self.mncstep = max(dp.rp.mncstep for dp in self.domainParams)