Source code for mispr.gaussian.firetasks.run_calc

"""Define firetasks for running Gaussian calculations."""

import os
import shutil
import logging
import subprocess

from timeit import default_timer as timer
from configparser import ConfigParser

import numpy as np

from monty.os.path import zpath
from monty.serialization import loadfn

from pymatgen.io.gaussian import GaussianInput

from fireworks.fw_config import CONFIG_FILE_DIR
from fireworks.core.firework import Firework, FWAction, FiretaskBase
from fireworks.utilities.fw_utilities import explicit_serialize

from custodian import Custodian
from custodian.gaussian.jobs import GaussianJob
from custodian.gaussian.handlers import WallTimeErrorHandler, GaussianErrorHandler

from mispr.gaussian.defaults import CUSTODIAN_MAX_ERRORS
from mispr.gaussian.utilities.misc import recursive_compare_dicts

__author__ = "Rasha Atwi"
__maintainer__ = "Rasha Atwi"
__email__ = "rasha.atwi@stonybrook.edu"
__status__ = "Development"
__date__ = "Jan 2021"
__version__ = "0.0.4"

logger = logging.getLogger(__name__)


[docs]@explicit_serialize class RunGaussianDirect(FiretaskBase): """ Execute a command directly for running Gaussian (no custodian). Args: input_file (str, optional): Name of the Gaussian input file. output_file (str, optional): Name of the Gaussian output file. gaussian_cmd (str, optional): Name of the full executable to run; if not provided, will attempt to find the command in the config file. """ required_params = [] optional_params = ["input_file", "output_file", "gaussian_cmd"]
[docs] def run_task(self, fw_spec): working_dir = os.getcwd() input_file = self.get("input_file", "mol.com") input_path = os.path.join(working_dir, input_file) output_file = self.get("output_file", "mol.out") output_path = os.path.join(working_dir, output_file) cmd = self.get("gaussian_cmd") if not cmd: cfg = ConfigParser() cfg.read(CONFIG_FILE_DIR + "/config.ini") cmd = cfg["RunCalc"]["gcmd"] cmd = cmd.replace("$input_path$", input_path).replace( "$output_path$", output_path ) logger.info("Running command: {}".format(cmd)) st = timer() return_code = subprocess.call(cmd, shell=True) run_time = timer() - st logger.info("Finished running with return code: {}".format(return_code)) fw_spec["run_time"] = run_time
[docs]@explicit_serialize class RunGaussianCustodian(FiretaskBase): """ Run Gaussian using custodian. Args: input_file (str, optional): Name of the Gaussian input file. output_file (str, optional): Name of the Gaussian output file. gaussian_cmd (str, optional): Name of the full executable to run; if not provided, will attempt to find the command in the config file. stderr_file (str, optional): Name of the file to direct standard error to. job_type (str, optional): Type of job to run; supported options are (1) normal and (2) better_guess. Defaults to "normal". backup (bool, optional): Whether to backup the initial input file; if True, the input will be copied with a ".orig" appended. Defaults to True. scf_max_cycles (int, optional): Maximum number of SCF cycles to run; defaults to 100. opt_max_cycles (int, optional): Maximum number of optimization cycles to run; defaults to 100. cart_coords (bool, optional): Whether to use cartesian coordinates; defaults to True. max_errors (int, optional): Maximum number of errors to handle before giving up. Defaults to the number specified in ``mispr.gaussian.defaults.py``. lower_functional (str, optional): Lower level of theory to use if the optimization fails and job_type is set to "better_guess; this will attempt to generate a better initial guess of the geometry before running the job again at the higher level of theory. lower_basis_set (str, optional): Less expensive basis set to use if the optimization fails and job_type is set to "better_guess; this will attempt to generate a better initial guess of the geometry before running the job again at the higher level of theory. prefix (str, optional): Prefix to the files. Defaults to error, which means a series of error.1.tar.gz, error.2.tar.gz, ... will be generated. suffix (str, optional): A suffix to be appended to the final output; e.g., to rename all Gaussian output from mol.out to mol.out.1, provide ".1" as the suffix. check_convergence (bool, optional): Whether to check convergence in an optimization job; this will also generate a plot with the convergence criteria as a function of the number of iterations. Defaults to True. wall_time (int, optional): Wall time set to the job in seconds; if provided, will add the ``WallTimeErrorHandler``, which will restart the job if it hits the wall time limit. buffer_time (int, optional): Buffer time set to the job in seconds; if provided; if the remaining time for the job = buffer_time, the ``WallTimeErrorHandler`` will cancel the job and restart it; this is done because if the job hits wall time on its own and is cancelled, it will no longer be possible to restart it. Defaults to 300 seconds. max_wall_time_corrections (int, optional): Maximum number of wall time corrections to make. Defaults to 3. """ required_params = [] optional_params = [ "input_file", "output_file", "gaussian_cmd", "stderr_file", "job_type", "backup", "scf_max_cycles", "opt_max_cycles", "cart_coords", "max_errors", "lower_functional", "lower_basis_set", "prefix", "suffix", "check_convergence", "wall_time", "buffer_time", "max_wall_time_corrections", "additional_fw", ]
[docs] def run_task(self, fw_spec): # working_dir = os.getcwd() input_file = self.get("input_file", "mol.com") wt_input_file = fw_spec.get("overwrite_input_file", input_file) # input_path = os.path.join(working_dir, input_file) output_file = self.get("output_file", "mol.out") # output_path = os.path.join(working_dir, output_file) backup = self.get("backup", True) prefix = self.get("prefix", "error") stderr_file = self.get("stderr_file", "stderr.txt") scf_max_cycles = self.get("scf_max_cycles", 100) opt_max_cycles = self.get("opt_max_cycles", 100) max_errors = self.get("max_errors", CUSTODIAN_MAX_ERRORS) job_type = self.get("job_type", "normal") lower_functional = self.get("lower_functional", None) lower_basis_set = self.get("lower_basis_set", None) cart_coords = self.get("cart_coords", True) check_convergence = self.get("check_convergence", True) wall_time = self.get("wall_time", None) buffer_time = self.get("buffer_time", 300) cmd = self.get("gaussian_cmd") if not cmd: cfg = ConfigParser() cfg.read(CONFIG_FILE_DIR + "/config.ini") cmd = cfg["RunCalc"]["gcmd"] cmd = cmd.replace("$input_path$", wt_input_file).replace( "$output_path$", output_file ) if job_type == "normal": jobs = [ GaussianJob( gaussian_cmd=cmd, input_file=input_file, output_file=output_file, stderr_file=stderr_file, suffix=self.get("suffix", ""), backup=backup, ) ] elif job_type == "better_guess": if not lower_functional or not lower_basis_set: raise Exception( f"{job_type} is requested but the functional " f"and/or basis set to use for the SCF " f"calculation are not provided! Exiting..." ) jobs = GaussianJob.generate_better_guess( gaussian_cmd=cmd, input_file=input_file, output_file=output_file, stderr_file=stderr_file, backup=backup, cart_coords=cart_coords, directory=os.getcwd(), ) else: raise ValueError(f"Unsupported job type: {job_type}") handlers = [ GaussianErrorHandler( input_file=input_file, output_file=output_file, stderr_file=stderr_file, cart_coords=cart_coords, scf_max_cycles=scf_max_cycles, opt_max_cycles=opt_max_cycles, job_type=job_type, lower_functional=lower_functional, lower_basis_set=lower_basis_set, prefix=prefix, check_convergence=check_convergence, ) ] if wall_time: handlers.append( WallTimeErrorHandler( wall_time=wall_time, buffer_time=buffer_time, input_file=input_file, output_file=output_file, stderr_file=stderr_file, prefix=prefix, ) ) c = Custodian(handlers, jobs, max_errors=max_errors) st = timer() try: return_code = c.run() except Exception as e: # TODO: add a checkpoint here that fw_id is accessible # (only if _add_launchpad_and_fw_id is True in the fw_spec) if ( os.path.exists(zpath("custodian.json")) and os.path.getsize("custodian.json") > 0 ): custodian_data = loadfn(zpath("custodian.json")) for entry in custodian_data: for correction in entry.get("corrections"): if "wall_time_limit" in correction.get("errors"): if fw_spec.get( "number_of_wall_time_corrections", 0 ) <= self.get("max_wall_time_corrections", 3): print( "correction number:", fw_spec.get("number_of_wall_time_corrections", 0), ) fw = self.launchpad.get_fw_by_id(self.fw_id) fw.spec.update( { "_recovery": self.launchpad.get_recovery( self.fw_id ), "number_of_wall_time_corrections": fw_spec.get( "number_of_wall_time_corrections", 0 ) + 1, "overwrite_input_file": input_file + ".wt", } ) new_fw = Firework(fw.tasks, fw.spec, fw.name) return FWAction(detours=[new_fw]) raise e run_time = timer() - st logger.info("Finished running with return code: {}".format(return_code)) fw_spec["run_time"] = run_time
[docs]@explicit_serialize class RunGaussianFake(FiretaskBase): """ Run a fake Gaussian calculation. Args: ref_dir (str): Path to reference Gaussian run directory with input and output files in the folder. working_dir (str, optional): Directory where the fake calculation will be run. input_file (str, optional): Name of the input file (both reference input and new input). Defaults to mol.com. tolerance (float, optional): Tolerance for the comparison of the reference and user input file. Defaults to 0.0001. """ required_params = ["ref_dir"] optional_params = ["working_dir", "input_file", "tolerance"]
[docs] def run_task(self, fw_spec): self._verify_inputs() self._clear_inputs() self._generate_outputs()
@staticmethod def _recursive_lowercase(obj): if isinstance(obj, dict): updated_obj = {} for k, v in obj.items(): updated_obj[k.lower()] = RunGaussianFake._recursive_lowercase(v) return updated_obj elif isinstance(obj, str): return obj.lower() elif hasattr(obj, "__iter__"): updated_obj = [] for i in obj: updated_obj.append(RunGaussianFake._recursive_lowercase(i)) return updated_obj else: return obj def _verify_inputs(self): ref_dir = self["ref_dir"] working_dir = self.get("working_dir", os.getcwd()) gin_file = self.get("input_file", "mol.com") user_gin = GaussianInput.from_file(f"{working_dir}/{gin_file}") ref_gin = GaussianInput.from_file(f"{ref_dir}/{gin_file}") tol = self.get("tolerance") or 0.0001 np.testing.assert_equal(ref_gin.molecule.species, user_gin.molecule.species) np.testing.assert_allclose( ref_gin.molecule.cart_coords, user_gin.molecule.cart_coords, atol=tol ) ref_dict = self._recursive_lowercase(ref_gin.as_dict()) del ref_dict["molecule"] user_dict = self._recursive_lowercase(user_gin.as_dict()) del user_dict["molecule"] diff = recursive_compare_dicts(ref_dict, user_dict, "ref_dict", "user_dict") if diff: raise ValueError( f"Gaussian input is inconsistent with reference input!\n{diff}!" ) logger.info("RunGausianFake: verified input successfully") def _clear_inputs(self): working_dir = self.get("working_dir", os.getcwd()) gin_file = self.get("input_file", "mol.com") gin_file = f"{working_dir}/{gin_file}" if os.path.exists(gin_file): os.remove(gin_file) def _generate_outputs(self): ref_dir = self["ref_dir"] working_dir = self.get("working_dir", os.getcwd()) for file in os.listdir(ref_dir): full_path = f"{ref_dir}/{file}" if os.path.isfile(full_path): shutil.copy(full_path, working_dir) logger.info("RunGaussianFake: ran fake Gaussian, generated outputs")