# Author: Cameron F. Abrams, <cfa22@drexel.edu>
"""
A class for parsing the config file and creating the :class:`Config` object
Pestifer's user-configuration input uses ycleptic, an enhanced, YAML-based
configuration file manager. The :class:`Config` object is a descendent of the
:class:`Yclept` class. It also houses the :class:`pestifer.core.resourcemanager.ResourceManager` object, which manages
access to the contents of Pestifer's :mod:`pestifer.resources` subpackage.
"""
import os
import logging
import shutil
from GPUtil import getGPUs
from importlib.metadata import version
from importlib.resources import files as pkg_files
from ycleptic import Yclept
from .errors import PestiferError
from .resourcemanager import ResourceManager
from ..tasks.taskcollections import TaskList
from ..util.stringthings import my_logger
from ..scripters import PsfgenScripter, NAMDColvarInputScripter, PackmolScripter, VMDScripter, GenericScripter
from ..scripters.namd import NAMDScripter
from ..scripters.packmol import check_packmol_version
logger = logging.getLogger(__name__)
[docs]
class Config(Yclept):
"""
A class for managing the configuration of Pestifer.
This class extends the :class:`ycleptic.yclept.Yclept` class to provide additional functionality
specific to Pestifer's configuration needs.
Parameters
----------
userfile : str, optional
Path to the user-specific configuration file. If not provided, the default configuration is used.
userdict : dict, optional
A dictionary of user-specific configuration options. If not provided, the default configuration is used.
quiet : bool, optional
If True, suppresses output to the console. Default is False.
RM : ResourceManager
An instance of the ResourceManager class, which manages access to the contents of Pestifer's resources.
basefile : str
Optional name of the Ycleptic-format base file
"""
def __init__(self, userfile='', userdict={}, quiet=False, RM: ResourceManager = None, basefile: str = '', ncpus_override: int = 0):
self.userfile = userfile
self.userdict = userdict
self.quiet = quiet
self.basefile = basefile
self.RM = RM
self.ncpus_override = ncpus_override
self.my_processor_info = ''
self.kwargs_to_scripters = {}
[docs]
def taskless_subconfig(self) -> 'Config':
""" Create a taskless subconfiguration from the progenitor configuration. """
subconfig = self.__class__(quiet=True, RM=self.RM).configure()
subconfig['user']['tasks'] = TaskList([])
return subconfig
[docs]
def get_scripter(self, scripter_name: str):
"""
Get a scripter instance by name.
Parameters
----------
scripter_name : str
The name of the scripter to retrieve. Must be one of 'psfgen', 'namd', 'packmol', 'tcl', 'data', or 'vmd'.
Returns
-------
Scripter
An instance of the requested scripter.
"""
if scripter_name in self.scripters:
return self.scripters[scripter_name]
else:
raise ValueError(f'Scripter {scripter_name} not found.')
def _set_kwargs_to_scripters(self):
"""
Defines a collection of resources sent to all scripters
"""
assert self.RM.charmmff_content is not None, 'ResourceManager must have charmmff_content set.'
self.kwargs_to_scripters = dict(
charmmff = self.RM.charmmff_content,
charmmff_content = self.RM.charmmff_content,
charmmff_config = self['user']['charmmff'],
charmrun = self.shell_commands['charmrun'],
gpu_devices = self.gpu_devices,
local_ncpus = self.local_ncpus,
namd = self.shell_commands['namd3'],
namd3gpu = self.shell_commands['namd3gpu'],
namd_config = self['user']['namd'],
namd_deprecates = self['user']['namd'].get('deprecated3', {}),
namd_type = self.namd_type,
namd_version = self['user']['namd'].get('version', '3.0'),
ncpus = self.ncpus,
ngpus = self.ngpus,
packmol = self.shell_commands['packmol'],
psfgen_config = self['user']['psfgen'],
progress = self.use_terminal_progress,
slurmvars = self.slurmvars,
tcl_root = self.tcl_root,
tcl_pkg_path = self.tcl_pkg_path,
tcl_script_path = self.tcl_script_path,
vmd = self.shell_commands['vmd'],
vmd_startup_script = self.vmd_startup_script,
)
def _set_processor_info(self):
"""
Determine the number of CPUs and GPUs available for this process.
This method checks if the process is running under SLURM (a job scheduler)
and retrieves the number of nodes and tasks per node. If SLURM variables are not set,
it defaults to the local CPU count. It also checks for available GPUs using GPUtil.
If running under SLURM, the return value includes the number of nodes and tasks per node.
If running locally, it includes the local CPU count and the number of GPUs detected.
If no GPUs are detected, it will not mention GPUs in the output.
If GPUs are detected, it will include the number of GPUs and their IDs.
Returns
-------
str
A string summarizing the number of CPUs and GPUs available.
"""
self.slurmvars = {k: os.environ[k] for k in os.environ if 'SLURM' in k}
self.local_ncpus = os.cpu_count()
self.gpus_allocated = ''
self.ngpus = 0
self.gpu_devices = ''
retstr = ''
if self.slurmvars and 'SLURM_NNODES' in self.slurmvars and 'SLURM_NTASKS_PER_NODE' in self.slurmvars:
# we are in a batch execution managed by slurm
nnodes = int(self.slurmvars['SLURM_NNODES'])
ntaskspernode = int(self.slurmvars['SLURM_NTASKS_PER_NODE'])
ncpus = nnodes * ntaskspernode
retstr += f'SLURM: {nnodes} nodes; {ncpus} cpus'
if 'SLURM_JOB_GPUS' in self.slurmvars:
self.gpu_devices = self.slurmvars['SLURM_JOB_GPUS']
self.ngpus = len(self.gpus_allocated.split(','))
ess = 's' if self.ngpus > 1 else ''
retstr += f'; {self.ngpus} gpu{ess}'
else:
retstr += f'Local: {self.local_ncpus} cpus'
ncpus = self.local_ncpus
gpus = getGPUs()
if len(gpus) > 0:
self.ngpus = len(gpus)
self.gpu_devices = ','.join([str(x.id) for x in gpus])
ess = 's' if self.ngpus > 1 else ''
retstr += f'; {self.ngpus} gpu{ess}'
if self.ncpus_override > 0:
ncpus = self.ncpus_override
retstr += f'; will use {ncpus} PEs (--ncpus override)'
else:
user_ncpus = self['user']['namd'].get('ncpus', 0)
if user_ncpus > 0:
ncpus = user_ncpus
retstr += f'; will use {ncpus} PEs (config-specified)'
else:
retstr += f'; will use {ncpus} PEs (auto-detected)'
self.ncpus = ncpus
return retstr
def _set_shell_commands(self, verify_access=True):
""" Defines all shell commands used by Pestifer """
required_commands = ['charmrun', 'namd3', 'vmd', 'catdcd', 'packmol']
command_alternates = {'namd3': 'namd2'}
self.shell_commands = {}
for rq in required_commands:
self.shell_commands[rq] = self['user']['paths'][rq]
rq_resolved = shutil.which(self.shell_commands[rq])
rq_alt = command_alternates.get(rq, None)
if not rq_resolved and not rq_alt:
raise PestiferError(f'Cannot find or execute required command {self.shell_commands[rq]!r}.')
if not rq_resolved:
if rq in command_alternates:
rqalt = command_alternates[rq]
self.shell_commands[rq] = self['user']['paths'][rqalt]
altrq_resolved = shutil.which(self.shell_commands[rq])
if altrq_resolved:
logger.info(f'Using alternate command {self.shell_commands[rq]} for {rq}.')
rq_resolved = altrq_resolved
else:
raise PestiferError(f'Cannot find or execute required command {self.shell_commands[rq]!r} or alternate {self.shell_commands[rqalt]!r}.')
else:
raise PestiferError(f'Cannot find or execute required command {self.shell_commands[rq]!r}.')
if rq_resolved is not None and verify_access:
assert os.access(rq_resolved, os.X_OK), f'You do not have permission to execute {rq_resolved}'
if verify_access:
try:
check_packmol_version(self.shell_commands['packmol'])
except RuntimeError as e:
raise PestiferError(str(e)) from e
namd3_path = self.shell_commands['namd3']
namd3gpu_path = self['user']['paths']['namd3gpu']
self.shell_commands['namd3gpu'] = namd3gpu_path
if namd3gpu_path != namd3_path:
namd3gpu_resolved = shutil.which(namd3gpu_path)
if namd3gpu_resolved:
if verify_access:
assert os.access(namd3gpu_resolved, os.X_OK), f'You do not have permission to execute {namd3gpu_resolved}'
self.namd_type = 'gpu'
else:
logger.warning(f'namd3gpu {namd3gpu_path!r} not found; falling back to CPU mode')
self.shell_commands['namd3gpu'] = namd3_path
self.namd_type = 'cpu'
else:
self.namd_type = 'cpu'
self.namd_deprecates = self['user']['namd']['deprecated3']
def _set_internal_shortcuts(self):
self.use_terminal_progress = len(self.slurmvars) == 0
RM = self.RM
self.tcl_root = RM.get_tcldir()
assert os.path.exists(self.tcl_root)
self.tcl_pkg_path = RM.get_tcl_pkgdir()
assert os.path.exists(self.tcl_pkg_path)
self.tcl_script_path = RM.get_tcl_scriptsdir()
assert os.path.exists(self.tcl_script_path)
self.vmd_startup_script = os.path.join(self.tcl_root, 'vmdrc.tcl')
assert os.path.exists(self.vmd_startup_script)
self.namd_config_defaults = self['user']['namd']
self.segtypes = RM.labels.segtypes
if self['user']['psfgen']['segtypes']:
RM.labels.update_segtypes(self['user']['psfgen']['segtypes'])
# logger.debug(f'segtypes: {self.segtypes}')
for atom_alias in RM.labels.aliases['atom']:
if atom_alias not in self['user']['psfgen']['aliases']['atom']:
# add the atom alias to the user config
self['user']['psfgen']['aliases']['atom'].append(atom_alias)
for residue_alias in RM.labels.aliases['residue']:
if residue_alias not in self['user']['psfgen']['aliases']['residue']:
# add the residue alias to the user config
self['user']['psfgen']['aliases']['residue'].append(residue_alias)
RM.labels.update_aliases(residue_aliases=self['user']['psfgen']['aliases']['residue'],
atom_aliases=self['user']['psfgen']['aliases']['atom'])
# logger.debug(f'psfgen aliases: {self["user"]["psfgen"]["aliases"]}')