Source code for viperleed.calc.files.iosearch

"""Module iosearch of viperleed.calc.files.

Functions for reading, processing and writing files relevant
to the search.
"""

__authors__ = (
    'Florian Kraushofer (@fkraushofer)',
    'Alexander M. Imre (@amimre)',
    )
__copyright__ = 'Copyright (c) 2019-2024 ViPErLEED developers'
__created__ = '2020-08-19'
__license__ = 'GPLv3+'

import copy
import logging
import os
from pathlib import Path
import random
import shutil
import time

import fortranformat as ff
import numpy as np

from viperleed.calc.files import poscar
from viperleed.calc.files.beams import writeAUXEXPBEAMS
from viperleed.calc.files.iorfactor import largest_nr_grid_points
from viperleed.calc.files.iorfactor import prepare_rfactor_energy_ranges
from viperleed.calc.files.vibrocc import writeVIBROCC
from viperleed.calc.lib import leedbase
from viperleed.calc.lib.base import BackwardsReader, readIntLine
from viperleed.calc.lib.version import Version

logger = logging.getLogger(__name__)


[docs]class SearchIORaceConditionError(Exception): """Raised if reading of control.chem does not return the expected number of lines""" def __init__(self, message): super().__init__(message)
[docs]class SearchIOEmptyFileError(Exception): """Raised if file read for the search has no content""" def __init__(self, message): super().__init__(message)
def readSDTL_next(filename="SD.TL", offset=0): """ Reads SDTL from offset to end, returns new offset and the content in between as string. Parameters ---------- filename : str, optional The file to read from. The default is "SD.TL". offset : int, optional Where in the file to start. The default is 0. Returns ------- offset : int The position to which SD.TL was read; this should be passed as offset when this function is called next. content : str The content of the file found between the old offset and the new. """ try: with open(filename, "r") as rf: rf.seek(offset) content = rf.read() newoffset = rf.tell() return(newoffset, content) except Exception: logger.error("Error reading SD.TL file") return (offset, "") # return old offset, no content def readSDTL_blocks(content, whichR=0, print_info=False, n_expect=0): """ Attempts to interpret a given string as one or more blocks of an SD.TL file. Parameters ---------- content : str A block of data as read from SD.TL. whichR : int, optional Which r-factor values to use (average / integer / fractional) print_info : bool, optional Whether some basic information should be printed to logger.info and logger.warning. The default is False. n_expect : int, optional Number of configurations expected per block. If a block contains fewer, it will be ignored. Returns ------- returnList : list List consists of one entry per data block. Each entry of the list is a tuple (gen, rfacs, configs), where gen is the generation index, rfacs an ordered list of R-factors, and configs the corresponding parameter configurations. config is a tuple listing (percent, dc) for each domain, with dc the parameter values of that domain. """ returnList = [] blocklist = content.split("CCCCCCCCCCC GENERATION")[1:] for block in blocklist: gen = 0 try: gen = int(block[:13]) if print_info: logger.info("Reading search results from SD.TL, " "generation {}".format(gen)) except ValueError: if print_info: logger.warning("While reading SD.TL, could not interpret " "generation number "+block[:13]) lines = block.split("\n") rfacs = [] configs = [] dpars = [] # list of (list of (percent, parameters)) by domain for line in lines: if "|" in line and line[:3] != "IND": # this is a line with data in it if line.split("|")[0].strip(): # first line - contains R-factor if dpars: configs.append(tuple(dpars)) dpars = [] try: rav = float(line.split("|")[2 + whichR]) # average R rfacs.append(rav) except ValueError: if print_info: logger.error("Could not read R-factor in SD.TL line:\n" + line) try: percent = int(line.split("|")[-2].strip()[:-1]) valstring = line.split("|")[-1].rstrip() pars = readIntLine(valstring, width=4) dpars.append((percent, pars)) except ValueError: if print_info: logger.error("Could not read values in SD.TL line:\n" + line) if dpars: configs.append(tuple(dpars)) if not all([len(dp) == len(configs[0]) for dp in configs]): if print_info: logger.warning("A line in SD.TL contains fewer values than " "the others. Skipping SD.TL block.") continue if gen != 0 and len(rfacs) > 0 and len(configs) > 0: returnList.append((gen, rfacs, tuple(configs))) elif len(configs) < n_expect: if print_info: logger.warning("A block in SD.TL contains fewer configurations " "than expected.") else: if print_info: logger.warning("A block in SD.TL was read but not understood.") return returnList def repeat_fetch_SDTL_last_block(which_beams, expected_params, final, max_repeats=2000, wait_time=5): print_info = final content = None for repeat in range(max_repeats): content = _fetch_SDTL_last_block(which_beams, expected_params, print_info) try: # try again if content is not complete n_search_params_found = len(content[0][2]) except IndexError: continue if n_search_params_found == expected_params: logger.debug("Read complete block from SD.TL file after " f"{repeat+1} read attempt(s).") return content elif n_search_params_found >= expected_params: raise RuntimeError(f"Expected a maximum of {expected_params} " f"parameters in SD.TL, but found " "{n_search_params_found}.") # wait and try again time.sleep(wait_time/1000) print_info = False if not content: raise SearchIOEmptyFileError("No data found in SD.TL file.") # if we get here, we have exceeded the maximum number of repeats raise SearchIORaceConditionError(f"Could not read complete block from " "SD.TL file.") def _fetch_SDTL_last_block(which_beams, n_expect, print_info=False): try: lines = readSDTL_end(n_expect=n_expect) except FileNotFoundError: logger.error("Could not process Search results: SD.TL file not " "found.") raise except Exception: logger.error("Failed to get last block from SD.TL file.") raise # check if the last block contains the expected number of lines lines_str = "\n".join(lines) sdtl_content = readSDTL_blocks("\n".join(lines), whichR=which_beams, print_info=print_info, n_expect=n_expect) return sdtl_content def readSDTL_end(filename="SD.TL", n_expect=0): """ Reads the last generation block from the SD.TL file, starting from the last line containing a GENERATION label. Parameters ---------- filename : str, optional Which file to read n_expect : int, optional Number of configurations expected per block. If the last block contains fewer, it will be ignored, instead reading the second-to-last. Set to 0 to read the last block irrespective of length. Returns ------- lines : str The contents of the file from the last line containing GENERATION to the end. """ # get the last block from SD.TL: bwr = BackwardsReader(filename) try: lines = [""] while len(bwr.data) > 0: while ("CCCCCCCCCCC GENERATION" not in lines[-1] and len(bwr.data) > 0): lines.append(bwr.readline().rstrip()) if len([line for line in lines if "|" in line]) > n_expect: break lines = [""] lines.reverse() finally: bwr.close() return lines def readDataChem(rp, source, cutoff=0, max_configs=0): """ Reads the data from a list of data.chem files, or a single file. Parameters ---------- rp : Rparams Run parameters. source : either a filename, or an iterable containing multiple file names. The files from which to read data; usually of format 'data*.chem'. cutoff : float 0 to read all, else specifies the maximum R-factor to be stored. All structures with higher R will be discarded. max_configs : int 0 to read all, else specifies the maximum number of configurations to read in. Will sort configurations and read in the ones with lowest R-factors first, discard the rest. Returns ------- list Tuples (rfac, config), where config is a tuple listing (percent, dc) for each domain, with dc the parameter values of that domain. """ def getPercent(domain_step, dp_vals): """Transforms from integer domain area values to percent""" dsteps = int(100 / domain_step) + 1 shifted = [v-1 for v in dp_vals] norm = 1 if sum(shifted) > 0: norm = (dsteps-1)/sum(shifted) # note that TensErLEED uses implicit floor int conversion here up to # version 1.6, so results may be inconsistent with SD.TL. pl = [int(round(v*norm))*domain_step for v in shifted[:-1]] pl.append(100 - sum(pl)) return pl if type(source) == str: source = [source] lines = set() for f in source: with open(f, "r") as rf: lines.update(rf.readlines()[1:]) returnList = [] parslen = 0 if max_configs == 0: max_configs = len(lines) if len(lines) > max_configs: lines = set(sorted(list(lines))[:max_configs]) for line in lines: try: rfac = float(line.split("|")[0].strip()) if cutoff != 0 and rfac > cutoff: continue valstring = line.split("|")[1].rstrip() pars = readIntLine(valstring, width=4) except (ValueError, IndexError): logger.debug("Could not read values in data.chem line:\n"+line) continue if len(pars) != parslen: if parslen == 0: parslen = len(pars) else: continue # incomplete line, read while file was written if not rp.domainParams: returnList.append((rfac, ((100, pars[:-1]),))) else: dp_vals = pars[-len(rp.domainParams):] percent = getPercent(rp.DOMAIN_STEP, dp_vals) dpars = [] for v in [len(dp.rp.searchpars) for dp in rp.domainParams]: dpars.append(tuple(pars[:v])) pars = pars[v:] returnList.append((rfac, tuple(zip(percent, dpars)))) return returnList def writeRfInfo(sl, rp, file_path="rf.info"): """ Generates r-factor parameters for the search, combines them with the experimental beams in AUXEXPBEAMS format to make the entire input for the search, returns that as a string. Parameters ---------- sl : Slab The Slab object used in the search. Only used for determining equivalent beams. rp : Rparams Run parameters. file_path : pathlike or str Pathlike to or name of the output file. If str uses rp.workdir /filename. The default is "rf.info". Returns ------- output : str Content of the output file. """ _, theo_range, _, vincr = prepare_rfactor_energy_ranges(rp) # find correspondence experimental to theoretical beams: beamcorr = leedbase.getBeamCorrespondence(sl, rp) # integer & fractional beams iorf = [] for beam in rp.expbeams: if beam.hkfrac[0].denominator != 1 or beam.hkfrac[1].denominator != 1: iorf.append(2) else: iorf.append(1) if rp.TL_VERSION < Version('1.7.0'): formatter = {'energies': ff.FortranRecordWriter('F7.2'), 'int': ff.FortranRecordWriter('I3'), 'beams': ff.FortranRecordWriter("25I3"), 'weights': ff.FortranRecordWriter("25F3.1") } else: formatter = {'energies': ff.FortranRecordWriter('F7.2'), 'int': ff.FortranRecordWriter('I4'), 'beams': ff.FortranRecordWriter("25I4"), 'weights': ff.FortranRecordWriter("25F4.1") } output = (formatter['energies'].write([theo_range.min]).ljust(16) + "EMIN\n") output += ( formatter['energies'].write([theo_range.max + 0.1 * vincr]).ljust(16) + "EMAX\n" ) # !!! BULLSHIT RESULTS WHEN EMAX > MAX ENERGY IN DELTA FILES # TODO: Issue #138 output += (formatter['energies'].write([vincr]).ljust(16) + "EINCR\n") output += (formatter['int'].write([0]).ljust(16) + "IPR - determines amount of output to stdout\n") output += (formatter['energies'].write([rp.V0_IMAG]).ljust(16) + "VI\n") output += (formatter['energies'].write([0.]).ljust(16) + "V0RR\n") output += (formatter['energies'].write([rp.IV_SHIFT_RANGE.start]).ljust(16) + "V01\n") output += (formatter['energies'].write([rp.IV_SHIFT_RANGE.stop]).ljust(16) + "V02\n") output += (formatter['energies'].write([vincr]).ljust(16) + "VINCR\n") output += (formatter['int'].write([rp.R_FACTOR_SMOOTH]).ljust(16) + "ISMOTH\n") output += (formatter['int'].write([0]).ljust(16) + "EOT - 0: exp format, 1: van Hove format\n") output += ((formatter['int'].write([len(rp.ivbeams)]) + formatter['int'].write([len(rp.expbeams)])).ljust(16) + "NTH NEX\n") # numbers of theoretical beams, as they correspond to experimental beams output += (formatter['beams'].write([n+1 for n in beamcorr]) + "\n") output += " DATA MITTEL (integer & fractional beams) :\n" output += formatter['beams'].write(iorf) + "\n" output += " exp - th relationship IBP, beam weights WB\n" output += formatter['beams'].write([n + 1 for n in range(0, len(rp.expbeams))]) + "\n" output += formatter['weights'].write([1]*len(rp.expbeams))+"\n" auxexpbeams = writeAUXEXPBEAMS(rp.expbeams, header=rp.systemName, write=True, numbers=True, version=rp.TL_VERSION) output += auxexpbeams if isinstance(file_path, str): _file_path = rp.workdir / file_path else: _file_path = file_path try: with open(_file_path, 'w') as wf: wf.write(output) except Exception: logger.error(f"Failed to write {_file_path}") raise logger.debug(f"Wrote to {_file_path} successfully") return output def generateSearchInput(sl, rp, steuOnly=False, cull=False, info=True): """ Generates a PARAM and a search.steu file for the search. Parameters ---------- sl : Slab The Slab object used in the search. Only used if there are no domains. rp : Rparams Run parameters. steuOnly : bool, optional If True, PARAM and restrict.f will not be written. The default is False. cull : bool, optional If True, part of the population (given by rp.SEARCH_CULL) will be removed and replaced according to rp.SEARCH_CULL. The default is False. info : bool, optional If False, no info messages will be printed. Returns ------- None """ # first generate list of SearchPar objects and figure out search parameters rp.generateSearchPars(sl) if rp.indyPars == 0: logger.warning("The current search includes no variations.") rp.indyPars = 1 # if search population is undefined, calculate a default: if rp.SEARCH_POPULATION == 0: spop = min(48, 15 + rp.indyPars) rp.SEARCH_POPULATION = int(np.ceil(spop / rp.N_CORES)) * rp.N_CORES # calculate some more things for later expEnergies = set([k for b in rp.expbeams for k in b.intens]) if info: totalrange = rp.total_energy_range() logger.info("Total energy range from experimental beams is " "{:.2g} eV ({} independent fit parameters, {:.2g} eV per " "parameter)" .format(totalrange, rp.indyPars, totalrange / rp.indyPars)) n_theo_energies = rp.THEO_ENERGIES.n_energies if n_theo_energies >= len(expEnergies): logger.warning("Theoretical beams have more data points than " "experimental beams") rp.setHaltingLevel(1) # TODO: will this crash? If so, raise here # merge offsets with displacement lists if rp.domainParams: attodo = [at for dp in rp.domainParams for at in dp.rp.search_atlist] ndom = len(rp.domainParams) astep = rp.DOMAIN_STEP nplaces = max([len(dp.rp.search_atlist) for dp in rp.domainParams]) else: attodo = rp.search_atlist ndom = 1 astep = 100 nplaces = len(rp.search_atlist) for at in attodo: if at.oriState is None: for el in at.offset_occ: if el not in at.disp_occ: logger.error( "{} has occupation offset defined for element {}, " "which does not appear in atom displacement list. " "Value will be skipped, this may cause errors." .format(at, el)) else: at.disp_occ[el] = [v + at.offset_occ[el] for v in at.disp_occ[el]] del at.disp_occ[el] # TODO: why? we write and then delete? Should be offset_occ? If yes, loop should go over .copy for (disp, offset) in [(at.disp_geo, at.offset_geo), (at.disp_vib, at.offset_vib), (at.disp_geo, at.disp_geo_offset)]: dl = [] for el in offset: if el not in disp: disp[el] = copy.copy(disp["all"]) disp[el] = [v + offset[el] for v in disp[el]] dl.append(el) for el in dl: if offset != at.disp_geo_offset: del offset[el] at.disp_geo_offset = {"all": [np.array([0., 0., 0.])]} # PARAM output = """C Here are some global parameterization to be performed C MNBED IS NUMBER OF EXPERIMENTAL BEAMS C MNBTD IS NUMBER OF THEORETICAL BEAMS C MNBMD IS MAX(MNBED,MNBTD) PARAMETER(MNBED = {})\n""".format(len(rp.expbeams)) output += " PARAMETER(MNBTD = {})\n".format(len(rp.ivbeams)) output += " PARAMETER(MNBMD = {})".format(max(len(rp.expbeams), len(rp.ivbeams))) max_nr_data_points = largest_nr_grid_points(rp, rp.theobeams['refcalc'], False) output += """ C MNDATA IS MAX. NUMBER OF DATA POINTS IN EXPERIMENTAL BEAMS PARAMETER(MNDATA = {})""".format(max_nr_data_points) # array size parameter only - add some buffer -> *1.1 output += """ C MNDATT IS NUMBER OF THEORETICAL DATA POINTS IN EACH BEAM PARAMETER(MNDATT = {})""".format(max_nr_data_points) output += """ C MPS IS POPULATION SIZE (number of independent trial structures) PARAMETER(MPS = {})""".format(rp.SEARCH_POPULATION) output += """ C MNDOM is number of domains to be incoherently averaged parameter (MNDOM = {})""".format(ndom) output += """ C MNPLACES IS NUMBER OF DIFFERENT ATOMIC SITES IN CURRENT VARIATION PARAMETER(MNPLACES = {})""".format(nplaces) output += """ C MNFILES IS MAXIMUM NUMBER OF TLEED-FILES PER SITE PARAMETER(MNFILES = {})""".format(rp.search_maxfiles) output += """ C MNCONCS IS MAX NUMBER OF CONCENTRATION STEPS PER SITE PARAMETER(MNCONCS = {})\n""".format(rp.search_maxconc) output += ("C MNPRMK IS NUMBER OF PARAMETERS - INCL ONE CONC PARAMETER " "PER SITE - incl. 1 per domain!\n") output += " PARAMETER(MNPRMK = {})".format(len(rp.searchpars)) output += """ C MNCSTEP IS MAX NUMBER OF VARIATIONS (geo. times therm.) IN 1 FILE C MNATOMS IS RELICT FROM OLDER VERSIONS PARAMETER (MNCSTEP = {}, MNATOMS = 1)""".format(rp.mncstep) # write PARAM if not steuOnly: try: with open("PARAM", "w") as wf: wf.write(output) except Exception: logger.error("Failed at writing PARAM file for search.") raise # now search.steu output = "" # while producing search.steu, collect information on # parameters for searchpars.info info = ("#".rjust(4) + "label".rjust(7) + "atom".rjust(7) + "elem".rjust(7) + "mode".rjust(7) + "steps".rjust(7) + "constr".rjust(7) + "\n") nsteps = [] # keep track of number of steps per parameter for init parcount = 0 # i3 = ff.FortranRecordWriter("I3") i1 = ff.FortranRecordWriter("I1") maxgen = rp.SEARCH_MAX_GEN if rp.TL_VERSION < Version('1.7.0'): formatter = {'int': ff.FortranRecordWriter('I3'), 'gens': ff.FortranRecordWriter('I6'), 'rmut': ff.FortranRecordWriter('F7.4'), 'ctrl': ff.FortranRecordWriter('I3'), } if maxgen > 999999: maxgen = 999999 ctrl_width = 3 else: formatter = {'int': ff.FortranRecordWriter('I5'), 'gens': ff.FortranRecordWriter('I7'), 'rmut': ff.FortranRecordWriter('F7.4'), 'ctrl': ff.FortranRecordWriter('I4'), } if maxgen > 9999999: maxgen = 9999999 ctrl_width = 4 output += (formatter['int'].write([rp.indyPars]).ljust(16) + "number of independent parameters\n") f74 = ff.FortranRecordWriter('F7.4') output += (formatter['rmut'].write([rp.GAUSSIAN_WIDTH]).ljust(16) + "gaussian width control parameter RMUT\n") output += (formatter['int'].write([0]).ljust(16) + "initialisation for " "random number generator - 0: use system time, 1,...: use" " init\n") output += (formatter['int'].write([rp.R_FACTOR_TYPE]).ljust(16) + "1: use RPe -- 2: use R2\n") output += (formatter['int'].write([rp.SEARCH_BEAMS]).ljust(16) + "Optimization of which beam group do you want? " "(0=Aver,1=Int,2=Half)\n") if rp.TL_VERSION >= Version('1.7.1'): outdata = 0 if rp.PARABOLA_FIT["type"] != "none": outdata = 1 output += (formatter['int'].write([outdata]).ljust(16) + "Store configurations and write data.chem files for " "parabola fit (0=false)\n") output += formatter['gens'].write([rp.output_interval]).ljust(16) + "output interval\n" output += (formatter['gens'].write([maxgen]).ljust(16) + "desired number of generations to be performed\n") output += (formatter['int'].write([astep]).ljust(16) + "area fraction step width (%)\n") output += ("SD.TL name of search document file " "(max. 10 characters)\n") output += (formatter['int'].write([ndom]).ljust(16) + "Number of domains under consideration\n") uniquenames = [] for k in range(0, ndom): if ndom == 1: crp = rp csl = sl frompath = "" else: crp = rp.domainParams[k].rp csl = rp.domainParams[k].sl frompath = rp.domainParams[k].workdir info += "---- DOMAIN {} ----\n".format(k+1) prev_parcount = parcount output += ("======= Information about Domain {}: =====================" "=======================\n").format(k+1) output += ( formatter['int'].write([len(crp.search_atlist)]).ljust(16) + "Number of atomic sites in variation: Domain {}\n".format(k+1)) displistcount = len(csl.displists)+1 surfats = csl.getSurfaceAtoms(crp) for (i, at) in enumerate(crp.search_atlist): printvac = False output += ( "------- Information about site {}: -----------------------" "-----------------------\n".format(i+1)) surf = 1 if at in surfats else 0 # Flag that goes into variable NSURF used by search in GetInt output += (formatter['int'].write([surf]).ljust(16) + "Surface (0/1)\n") if at.displist in csl.displists: dlind = csl.displists.index(at.displist) + 1 else: dlind = displistcount displistcount += 1 output += (formatter['int'].write([dlind]).ljust(16) + "Atom number\n") output += ( formatter['int'].write([len(at.known_deltas)]).ljust(16) + "No. of different files for Atom no. {}\n".format(i+1)) for (j, deltafile) in enumerate(at.known_deltas): name = deltafile if frompath: # need to get the file; if True frompath is Path name = "D{}_".format(k+1) + deltafile if len(name) > 15: un = 1 while "D{}_DEL_{}".format(k+1, un) in uniquenames: un += 1 name = "D{}_DEL_{}".format(k+1, un) uniquenames.append(name) try: shutil.copy2(frompath / deltafile, name) except Exception: logger.error("Error getting Delta file {} for search" .format(os.path.relpath( frompath / deltafile))) raise output += "****Information about file {}:\n".format(j+1) output += (name.ljust(16) + "Name of file {} (max. 15 " "characters)\n".format(j+1)) output += (formatter['int'].write([1]).ljust(16) + "Formatted(0/1)\n") el = deltafile.split("_")[-2] if el.lower() == "vac": geo = 1 vib = 0 types = 1 printvac = True else: # geo0 = True vib0 = True for (mode, disp) in [(1, at.disp_geo), (2, at.disp_vib)]: if el in disp: dl = disp[el] else: dl = disp["all"] if mode == 1: geo = len(dl) # if geo == 1 and np.linalg.norm(dl[0]) >= 1e-4: # geo0 = False else: vib = len(dl) if vib == 1 and dl[0] != 0.: vib0 = False if vib == 1 and vib0: vib = 0 # elif geo == 1 and geo0: # geo = 0 if geo > 0 and vib > 0: types = 2 else: types = 1 output += (formatter['int'].write([types]).ljust(16) + "Types of parameters in file {}\n".format(j+1)) label = i1.write([i+1])+i1.write([j+1]) constr = {"vib": "-", "geo": "-"} for mode in ["vib", "geo"]: spl = [s for s in crp.searchpars if s.el == el and s.atom == at and s.mode == mode] if spl and spl[0].restrictTo is not None: sp = spl[0] if type(sp.restrictTo) == int: constr[mode] = str(sp.restrictTo) else: constr[mode] = "#"+str(crp.searchpars.index( sp.restrictTo) + prev_parcount + 1) elif spl and spl[0].linkedTo is not None: constr[mode] = "#"+str( crp.searchpars.index(spl[0].linkedTo) + prev_parcount + 1) if vib > 0: output += (formatter['int'].write([vib]).ljust(16) + "vibrational steps\n") parcount += 1 info += (str(parcount).rjust(4) + ("P"+label).rjust(7) + str(at.num).rjust(7) + el.rjust(7) + "vib".rjust(7) + str(vib).rjust(7) + constr["vib"].rjust(7) + "\n") nsteps.append(vib) if geo > 0: output += (formatter['int'].write([geo]).ljust(16) + "geometrical steps\n") parcount += 1 info += (str(parcount).rjust(4) + ("P"+label).rjust(7) + str(at.num).rjust(7) + el.rjust(7) + "geo".rjust(7) + str(geo).rjust(7) + constr["geo"].rjust(7) + "\n") nsteps.append(geo) output += "****concentration steps for site no. {}\n".format(i+1) occsteps = len(next(iter(at.disp_occ.values()))) output += (formatter['int'].write([occsteps]).ljust(16) + "no. of concentration steps - sum must equal 1 !\n") for j in range(0, occsteps): ol = "" totalocc = 0 for el in at.disp_occ: ol += f74.write([at.disp_occ[el][j]]) totalocc += at.disp_occ[el][j] if printvac: ol += f74.write([1 - totalocc]) output += (ol.ljust(16) + " concentration step no. {}\n".format(j+1)) label = i1.write([i+1])+i1.write([i+1]) parcount += 1 constr = "-" spl = [s for s in crp.searchpars if s.atom == at and s.mode == "occ"] if spl and spl[0].restrictTo is not None: sp = spl[0] if type(sp.restrictTo) == int: constr = str(sp.restrictTo) else: constr = "#"+str(crp.searchpars.index(sp.restrictTo)+1) elif spl and spl[0].linkedTo is not None: constr = "#"+str(crp.searchpars.index(spl[0].linkedTo)+1) info += (str(parcount).rjust(4) + ("C"+label).rjust(7) + str(at.num).rjust(7) + "-".rjust(7) + "occ".rjust(7) + str(occsteps).rjust(7) + constr.rjust(7) + "\n") nsteps.append(occsteps) # add info for domain step parameters for k in range(0, ndom): parcount += 1 if ndom == 1: astepnum = 1 else: astepnum = int(100 / astep) + 1 info += (str(parcount).rjust(4) + ("-".rjust(7)*3) + "dom".rjust(7) + str(astepnum).rjust(7) + "-".rjust(7).rjust(7) + "\n") nsteps.append(astepnum) # now starting configuration output += ("Information about start configuration: (Parameters for each " "place, conc for each place)\n") if rp.SEARCH_START == "control": controlpath = "" if os.path.isfile("control.chem"): controlpath = "control.chem" elif os.path.isfile(os.path.join("SUPP", "control.chem")): controlpath = os.path.join("SUPP", "control.chem") logger.warning("No control.chem file found in working folder, " "using SUPP/control.chem") rp.setHaltingLevel(1) else: logger.warning("No control.chem file found. Defaulting to random " "starting configuration.") rp.SEARCH_START = "random" rp.setHaltingLevel(2) if rp.SEARCH_START == "control": # try reading control.chem # length = population +1 for header; +1 for empty line at the end control_chem_expected_length = rp.SEARCH_POPULATION + 2 try: controllines = _read_control_chem(controlpath, control_chem_expected_length) except SearchIORaceConditionError: # Could not read last generation from control.chem # use backup from SD.TL instead if not rp.controlChemBackup: logger.error("Information in control.chem is incomplete " "and no SD.TL data is stored. " "Defaulting to random starting configuration.") rp.SEARCH_START = "random" rp.setHaltingLevel(2) else: logger.debug("Failed to read current configuration from " "control.chem. " "Loading last known configuraiton from stored " "SD.TL data instead...") controllines = [s+"\n" for s in rp.controlChemBackup.split("\n")] pass except OSError: logger.error("Error reading control.chem file. Defaulting to " "random starting configuration.") rp.SEARCH_START = "random" rp.setHaltingLevel(2) if rp.SEARCH_START == "random": output += (formatter['int'].write([0]).ljust(16) + "Certain start position (1) or random configuration (0)\n") else: output += (formatter['int'].write([1]).ljust(16) + "Certain start position (1) or random configuration (0)\n") if rp.SEARCH_START == "control": if cull and rp.SEARCH_CULL: try: ncull = rp.SEARCH_CULL.nr_individuals(rp.SEARCH_POPULATION) except ValueError: # Too small population ncull = 0 logger.warning( "SEARCH_CULL parameter too large: would cull " "entire population. Culling will be skipped." ) if any([sp.parabolaFit["min"] is not None for sp in rp.searchpars]): # replace one by predicted best getPredicted = True else: getPredicted = False nsurvive = rp.SEARCH_POPULATION - ncull clines = controllines[2:] csurvive = [] if rp.SEARCH_CULL.type_.is_genetic or getPredicted: # prepare readable clines try: csurvive = [readIntLine(s, width=ctrl_width) for s in clines[:nsurvive]] except ValueError: if rp.SEARCH_CULL.type_.is_genetic: logger.warning( "SEARCH_CULL: Failed to read old " "configuration from control.chem, cannot run " "genetic algorithm. Defaulting to cloning." ) rp.setHaltingLevel(1) csurvive = [] for (i, line) in enumerate(clines): if i < nsurvive: output += line elif (rp.SEARCH_CULL.type_.is_random or (rp.SEARCH_CULL.type_.is_genetic and csurvive) or getPredicted): if getPredicted: bc = None if csurvive: bc = csurvive[0] nc = rp.getPredictConfig( best_config=bc, mincurv=rp.PARABOLA_FIT["mincurv"]) getPredicted = False elif rp.SEARCH_CULL.type_.is_random: nc = rp.getRandomConfig() else: # "genetic" nc = rp.getOffspringConfig(csurvive) ol = "" for v in nc: ol += formatter['ctrl'].write([v]) output += ol + "\n" else: # "cloning" output += clines[random.randrange(0, nsurvive)] else: # don't cull for (i, line) in enumerate(controllines): if i > 1: # first 2 lines are empty output += line elif rp.SEARCH_START == "centered": for i in range(0, rp.SEARCH_POPULATION): ol = "" for n in nsteps: ol += formatter['ctrl'].write([(n+1)/2]) output += ol + "\n" else: # rp.SEARCH_START == "crandom" pop = [rp.getCenteredConfig()] for i in range(1, rp.SEARCH_POPULATION): pop.append(rp.getRandomConfig()) for p in pop: ol = "" for v in p: ol += formatter['ctrl'].write([v]) output += ol + "\n" # write search.steu try: with open("search.steu", "w") as wf: wf.write(output) except Exception: logger.error("Failed to write search.steu file for search.") raise # write searchpars.info try: with open("searchpars.info", "w") as wf: wf.write(info) except Exception: logger.error( "Failed to write searchpars.info file. This file is " "only for user information and will not affect operation.") if steuOnly: logger.debug("Wrote search.steu input for search.") return None # now restrict.f output = (""" C This subroutine serves to restrict individual parameters to certain values C inside the search. These values are stored in the PARIND(IPARAM,IPOP) array. C Parameters are counted as listed in control file search.steu, including C a concentration parameter after each atomic site, and the domain weight C parameters for each domain at the end of the PARIND array. C C perform restrictions in the following fashion: e.g. C C PARIND(1,IPOP) = 5 C C or C C PARIND(5,IPOP) = PARIND(1,IPOP) C C etc. subroutine restrict(NPRMK,NPS,PARIND,IPOP) INTEGER NPRMK,NPS INTEGER PARIND DIMENSION PARIND(NPRMK,NPS) C begin restrictions """) for (i, sp) in enumerate(rp.searchpars): if sp.restrictTo is None: continue if type(sp.restrictTo) == int: output += " PARIND({},IPOP) = {}\n".format(i+1, sp.restrictTo) else: output += " PARIND({},IPOP) = PARIND({},IPOP)\n".format( i + 1, rp.searchpars.index(sp.restrictTo) + 1) output += (""" C end restrictions RETURN END """) # write restrict.f try: with open("restrict.f", "w") as wf: wf.write(output) except Exception: logger.error("Failed to write restrict.f file for search.") raise logger.debug("Wrote input files for search: PARAM, search.steu, " "restrict.f") return None def _read_control_chem(control_chem_path, expected_lines, max_repeats=2000, sleep_time=5): """ Read the content of the control.chem file located at the specified path and return its lines. Parameters ---------- control_chem_path : path-like The path to the control.chem file. expected_lines : int The expected number of lines in the control.chem file. max_repeats : int, optional The maximum number of repetitions to read the file before giving up. Defaults is 1000. sleep_time : int, optional The time in milliseconds to sleep between each attempt. Default is 1. Returns ------- list of str The lines of the control.chem file. Raises ------ ValueError If max_repeats is less than 1 or sleep_time is less than 0. RuntimeError If the number of lines in the control.chem file exceeds the expected_lines. This should not happen and indicates that expected_lines was set incorrectly. SearchIORaceConditionError If the complete control.chem file could not be read after the maximum number of repetitions. """ if max_repeats < 1: raise ValueError("max_repeats must be >= 1") if sleep_time < 0: raise ValueError("sleep_time must be >= 0 ms") _control_chem_path = Path(control_chem_path) for repeat in range(max_repeats): with open(_control_chem_path, "r") as rf: control_lines = rf.readlines() n_control_lines = len(control_lines) if n_control_lines == expected_lines: logger.debug(f"Read complete control.chem file after {repeat+1} " "read attempt(s).") return control_lines elif n_control_lines > expected_lines: raise RuntimeError(f"Expected at maximum {expected_lines} lines " f"in control.chem, but found {n_control_lines}.") time.sleep(sleep_time/1000) # in milliseconds raise SearchIORaceConditionError("Could not read complete " "control.chem file") def writeSearchOutput(sl, rp, parinds=None, silent=False, suffix=""): """ Modifies data in sl and rp to reflect the search result given by parinds, then writes POSCAR_OUT and VIBROCC_OUT. Parameters ---------- sl : Slab The Slab object to be modified. rp : Rparams The run parameters object to be modified. parinds : iterable, optional The final parameter values to be applied. If None, rp.searchResultConfig will be used instead. Can be float, in which case linear interpolation between the two adjacent displacements will be used. silent : bool, optional Suppresses output to log. The default is False. suffix : str, optional String to be appended to the POSCAR_OUT and VIBROCC_OUT file names. Returns ------- None """ def interpolate(var, ind): if type(ind) == int: return var[ind] o = ind % 1 return (o * var[int(np.ceil(ind))] + (1 - o) * var[int(np.floor(ind))]) if parinds is None: if rp.searchResultConfig is None: logger.error("Failed to write search output: No configuration " "passed.") raise RuntimeError("Failed to write search output") else: parinds = rp.searchResultConfig[0] # If atom and site original states are not yet saved, do it now: for at in sl: at.storeOriState() for site in sl.sitelist: if site.oriState is None: tmp = copy.deepcopy(site) site.oriState = tmp sl.update_cartesian_from_fractional() uci = np.linalg.inv(sl.ucell) for at in sl: # make list of searchpars addressing this atom: sps = [sp for sp in rp.searchpars if sp.atom == at and sp.el != "vac"] if len(sps) > 0: # first deal with occupation: par = [sp for sp in sps if sp.mode == "occ"][0] # exactly one i = rp.searchpars.index(par) newocc = {} for el in at.disp_occ: # store as offset for now, recalculate for sites later at.offset_occ[el] = (interpolate(at.disp_occ[el], parinds[i]-1) - at.site.oriState.occ[el]) newocc[el] = interpolate(at.disp_occ[el], parinds[i]-1) # will be used for re-calculating position # now vibrations: vibpars = [sp for sp in sps if sp.mode == "vib"] # can be empty, or one per element for par in vibpars: i = rp.searchpars.index(par) el = par.el if el not in at.disp_vib: dict_el = "all" else: dict_el = el # store as offset for now, recalculate for sites later at.offset_vib[el] = interpolate(at.disp_vib[dict_el], parinds[i]-1) # now position - recalculate right away: geopars = [sp for sp in sps if sp.mode == "geo"] # can be empty, or one per element if not geopars: continue totalocc = 0 newpos = {} # new position per element totalpos = np.array([0., 0., 0.]) for par in geopars: i = rp.searchpars.index(par) el = par.el if el not in at.disp_geo: dict_el = "all" else: dict_el = el disp = np.copy(interpolate(at.disp_geo[dict_el], parinds[i]-1)) disp[2] *= -1 rdisp = np.dot(uci, disp) newpos[el] = at.oriState.pos + rdisp totalpos = totalpos + (newpos[el] * newocc[el]) totalocc += newocc[el] if totalocc > 0: totalpos = totalpos / totalocc at.pos = totalpos for el in newpos: rel_off = newpos[el] - at.pos off = np.dot(sl.ucell, rel_off) off[2] *= -1 at.offset_geo[el] = off sl.collapse_fractional_coordinates() sl.update_cartesian_from_fractional() sl.update_layer_coordinates() # now update site occupations and vibrations: for site in sl.sitelist: siteats = [at for at in sl if at.site == site and not at.is_bulk] if not siteats: # site is only found in bulk continue for el in site.occ: total_occ = 0 # n_occ = 0 total_vib = 0 # n_vib = 0 for at in siteats: if el in at.offset_occ: total_occ += at.offset_occ[el] # n_occ += 1 if el in at.offset_vib: total_vib += at.offset_vib[el] # n_vib += 1 # if n_occ > 0: offset_occ = total_occ/len(siteats) site.occ[el] = site.oriState.occ[el] + offset_occ # else: # offset_occ = 0.0 # if n_vib > 0: offset_vib = total_vib/len(siteats) site.vibamp[el] = site.oriState.vibamp[el] + offset_vib # else: # offset_vib = 0.0 for at in siteats: if el in at.offset_occ: at.offset_occ[el] -= offset_occ if el in at.offset_vib: at.offset_vib[el] -= offset_vib poscar_fn = "POSCAR_OUT" + suffix tmpslab = copy.deepcopy(sl) tmpslab.sort_original() try: poscar.write(tmpslab, filename=poscar_fn, comments="all", silent=silent) except OSError: logger.error("Exception occurred while writing POSCAR_OUT" + suffix, exc_info=rp.is_debug_mode) rp.setHaltingLevel(2) if not np.isclose(rp.SYMMETRY_CELL_TRANSFORM, np.identity(2)).all(): tmpslab = sl.make_subcell(rp, rp.SYMMETRY_CELL_TRANSFORM) poscar_mincell_fn = "POSCAR_OUT_mincell" + suffix try: poscar.write(tmpslab, filename=poscar_mincell_fn, silent=silent) except OSError: logger.warning( "Exception occurred while writing POSCAR_OUT_mincell" + suffix, exc_info=rp.is_debug_mode) vibrocc_fn = "VIBROCC_OUT" + suffix try: writeVIBROCC(sl, rp, filename=vibrocc_fn, silent=silent) except Exception: logger.error("Exception occured while writing VIBROCC_OUT" + suffix, exc_info=rp.is_debug_mode) rp.setHaltingLevel(2) return