# Author: Cameron F. Abrams, <cfa22@drexel.edu>
import logging
import os
import shutil
import tempfile
from .tcl import TcLScripter
from ..charmmff.charmmffprm import CharmmParamFile
from ..core.command import Command
from ..logparsers import NAMDLogParser, NAMDxstParser
from ..psfutil.psfcontents import PSFContents
from ..util.progress import NAMDProgress
logger = logging.getLogger(__name__)
[docs]
class NAMDScripter(TcLScripter):
"""
This class extends the TcLScripter class to provide functionality for creating and managing NAMD scripts (which NAMD refers to as "configurations" -- this is not to be confused with Pestifer's Config class).
Parameters
----------
config : Config
The configuration object containing settings for the script.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.charmmff = kwargs.get('charmmff')
assert self.charmmff is not None, 'charmmff must be provided to NAMDScripter'
self.charmmff_config = kwargs.get('charmmff_config')
self.charmrun = kwargs.get('charmrun')
self.gpu_devices = kwargs.get('gpu_devices')
self.local_ncpus = kwargs.get('local_ncpus')
self.namd = kwargs.get('namd')
self.namdgpu = kwargs.get('namd3gpu', self.namd)
self.namd_config = kwargs.get('namd_config')
self.namd_type = kwargs.get('namd_type')
self.namd_version = int(self.namd_config['namd-version'])
logger.debug(f'Using NAMD v {self.namd_version}')
self.ncpus = kwargs.get('ncpus')
self.ngpus = kwargs.get('ngpus')
self.slurmvars = kwargs.get('slurmvars')
if self.namd_version == 2:
self.namd_deprecates = {}
else:
self.namd_deprecates = kwargs.get('namd_deprecates')
logger.debug(f'{self.ncpus} cpus are available for namd')
self.default_ext = '.namd'
[docs]
def fetch_standard_charmm_parameters(self):
"""
Fetch the standard CHARMM parameters from the configuration and copy them to the local directory.
This method retrieves the parameters defined in the CHARMM force field configuration and copies them to
the local directory for use in the NAMD script.
Returns
-------
list
A list of parameter files that have been copied to the local directory.
"""
# logger.debug('Fetching standard CHARMMFF parameters')
parameters_local = []
for t in self.charmmff_config['standard']['prm'] + self.charmmff_config['standard']['str']:
if not t in parameters_local:
self.charmmff.copy_charmmfile_local(t)
parameters_local.append(t)
for t in self.charmmff_config['custom']['prm'] + self.charmmff_config['custom']['str']:
if t not in parameters_local:
self.charmmff.copy_charmmfile_local(t)
parameters_local.append(t)
# logger.debug(f'local parameters: {parameters_local}')
return parameters_local
[docs]
def newscript(self, basename=None, addl_paramfiles=[], skip_standard_params=False):
"""
Initialize a new NAMD script with a specified basename and additional parameter files.
If no basename is provided, a default script name is used.
Parameters
----------
basename : str, optional
The base name for the script file. If not provided, a default name is used.
addl_paramfiles : list, optional
A list of additional parameter files to be included in the script. These files should not contain
a path, but they should be recognizable .prm or .str files within the CHARMM force field or
the custom CHARMM-format directories.
skip_standard_params : bool, optional
If True, skip fetching the standard CHARMM parameter files and treat ``addl_paramfiles``
as the complete parameter file list. Use this when generating a self-contained package
that includes a pre-consolidated minimal ``.prm`` file.
"""
super().newscript(basename)
self.scriptname = f'{basename}{self.default_ext}'
self.banner('NAMD script')
if skip_standard_params:
self.parameters = []
else:
self.parameters = self.fetch_standard_charmm_parameters()
for at in sorted(addl_paramfiles):
if not at in self.parameters:
if not skip_standard_params:
self.charmmff.copy_charmmfile_local(at)
self.parameters.append(at)
for p in self.parameters:
assert os.sep not in p
self.addline(f'parameters {p}')
[docs]
def writescript(self, params, cpu_override=False):
"""
Write the NAMD script based on the provided parameters.
This method constructs the NAMD script by adding lines based on the parameters provided.
If the NAMD type is 'gpu' and `cpu_override` is False, it will also add GPU-specific configurations.
Parameters
----------
params : dict
A dictionary of parameters to include in the NAMD script.
cpu_override : bool, optional
If True, the script will be written with CPU-specific configurations, even if the NAMD type is 'gpu'.
"""
# logger.debug(f'params: {params}')
tailers = ['minimize', 'run', 'numsteps']
gpu_resident_active = self.namd_type == 'gpu' and not cpu_override
gpu_override_keys = {k.lower() for k in self.namd_config['gpu-resident']} if gpu_resident_active else set()
for k, v in params.items():
if k in tailers or k.lower() in gpu_override_keys:
continue
if type(v) == list:
for val in v:
if k == 'tcl':
self.addline(val)
else:
self.addline(f'{self.namd_deprecates.get(k, k)} {val}')
else:
if k == 'tcl':
self.addline(v)
else:
self.addline(f'{self.namd_deprecates.get(k, k)} {v}')
if gpu_resident_active:
for k, v in self.namd_config['gpu-resident'].items():
self.addline(f'{k} {v}')
for t in tailers:
if t in params:
self.addline(f'{self.namd_deprecates.get(t, t)} {params[t]}')
super().writescript()
[docs]
def consolidate_params(self, psf_path: str) -> str | None:
"""Replace the full parameter file set with a single minimal .prm for this PSF.
Reads atom types from *psf_path*, merges all files in ``self.parameters``,
extracts only the records needed for those atom types, writes
``{basename}_minimal.prm``, and rewrites the NAMD script so that its
``parameters`` lines reference only that one file.
Returns the path to the written minimal .prm on success, or None if
consolidation was skipped (e.g. no parameter files, PSF unreadable, or
parse failures on all files).
"""
if not self.parameters or not os.path.exists(psf_path):
return None
atomtypes = set(a.atomtype for a in PSFContents(psf_path).atoms)
logger.debug(f'consolidate_params: {len(atomtypes)} unique atom types in {psf_path}')
combined = CharmmParamFile()
n_parsed = 0
for fname in self.parameters:
try:
combined.merge(CharmmParamFile.from_file(fname))
n_parsed += 1
except Exception as exc:
logger.warning(f'consolidate_params: could not parse {fname}: {exc}')
if n_parsed == 0:
return None
minimal = combined.extract_for_atomtypes(atomtypes)
outname = f'{self.basename}_minimal.prm'
minimal.write(outname, title=f'Minimal CHARMM parameters for {self.basename}')
logger.debug(f'consolidate_params: wrote {outname} ({minimal.summary()})')
# Rewrite script: replace all 'parameters X' lines with the single minimal file
with open(self.scriptname, 'r') as fh:
lines = fh.readlines()
param_lines_replaced = False
new_lines = []
for line in lines:
if line.startswith('parameters '):
if not param_lines_replaced:
new_lines.append(f'parameters {outname}\n')
param_lines_replaced = True
# drop all subsequent 'parameters' lines
else:
new_lines.append(line)
with open(self.scriptname, 'w') as fh:
fh.writelines(new_lines)
self.parameters = [outname]
return outname
def _stage_params_to_local_scratch(self):
"""Copy parameter files to node-local scratch and rewrite the script to use absolute local paths.
Shared-filesystem reads (Lustre/NFS) during Charm++ multi-thread startup can be very slow
under load. Staging to $TMPDIR ensures NAMD reads from local storage regardless of
shared-FS congestion. Has no effect if $TMPDIR is not set or parameters list is empty.
"""
tmpdir = os.environ.get('TMPDIR', '')
if not tmpdir or not self.parameters:
return
scratch = os.path.join(tmpdir, f'namd_params_{os.getpid()}')
os.makedirs(scratch, exist_ok=True)
replacements = {}
for p in self.parameters:
src = os.path.abspath(p)
dst = os.path.join(scratch, p)
if not os.path.exists(dst):
shutil.copy2(src, dst)
replacements[p] = dst
# Rewrite 'parameters <basename>' lines in the already-written script file
with open(self.scriptname, 'r') as fh:
script = fh.read()
for basename, localpath in replacements.items():
script = script.replace(f'parameters {basename}', f'parameters {localpath}')
with open(self.scriptname, 'w') as fh:
fh.write(script)
logger.debug(f'Parameter files staged to local scratch: {scratch}')
[docs]
def runscript(self, **kwargs):
"""
Run the NAMD script using the NAMD command line interface.
This method constructs a command to execute NAMD with the specified script and options.
Parameters
----------
kwargs : dict
A dictionary of options to be passed to the NAMD command. This can include:
- ``local_execution_only``: If True, use the local CPU count instead of the configured CPU count.
- ``single_gpu_only``: If True, use only one GPU device.
- ``cpu_override``: If True, force the use of CPU settings even if the NAMD type is ``gpu``.
"""
assert hasattr(self, 'scriptname'), f'No scriptname set.'
if kwargs.get('single_cpu_only', False):
use_cpu_count = 1
elif kwargs.get('local_execution_only', False):
use_cpu_count = min(self.local_ncpus, self.ncpus)
else:
use_cpu_count = self.ncpus
if kwargs.get('single_gpu_only', False):
use_gpu_count = 1
use_gpu_devices = '0'
else:
use_gpu_count = self.ngpus
use_gpu_devices = self.gpu_devices
logger.debug(f'NAMD using {use_cpu_count} PE(s)')
if self.namd_type == 'cpu' or kwargs.get('cpu_override', False):
if self.slurmvars:
c = Command(f'numactl --interleave=all {self.namd} +p {use_cpu_count} {self.scriptname}')
else:
c = Command(f'{self.charmrun} +p {use_cpu_count} {self.namd} {self.scriptname}')
elif self.namd_type == 'gpu':
if len(self.slurmvars) > 0:
use_cpu_count = 8 if use_gpu_count == 1 else (use_gpu_count - 1) * 8 + 8 - (use_gpu_count - 1)
else:
use_cpu_count = self.local_ncpus
if use_gpu_count > 1:
pmepes = use_cpu_count - (use_gpu_count - 1) * 8
pmepes_flag = f'+pmepes {pmepes} '
else:
pmepes_flag = ''
c = Command(f'{self.namdgpu} +p{use_cpu_count} {pmepes_flag}+setcpuaffinity +devices {use_gpu_devices} {self.scriptname}')
logger.debug(f'NAMD launch command: {c.c}')
if self.slurmvars:
self._stage_params_to_local_scratch()
self.logname = f'{self.basename}.log'
self.logparser = NAMDLogParser(basename=self.basename)
self.logparser.auxparser = NAMDxstParser(basename=f'{self.basename}')
progress_struct = None
if self.progress:
progress_struct = NAMDProgress()
self.logparser.enable_progress_bar(progress_struct)
return c.run(logfile=self.logname, logparser=self.logparser)