# Author: Cameron F. Abrams, <cfa22@drexel.edu>
"""
Definition of the :class:`PSFContents` class for parsing and managing PSF topology files.
This class reads a PSF file, extracts topology information such as atoms, bonds, angles,
dihedrals, and patches, and provides methods for accessing and manipulating this data."""
import logging
import numpy as np
import os
from collections import UserList
from dataclasses import dataclass, field
from pestifer.core.labels import Labels
from .psfangle import PSFAngleList
from .psfatom import PSFAtom, PSFAtomList
from .psfbond import PSFBondList
from .psfpairex import PSFPairExList
from .psfdihedral import PSFDihedralList
from .psfremark import PSFRemark, PSFRemarkList, PSFSegmentRemark, PSFSegmentRemarkList
from .psftopoelement import LineList
from .psfpatch import PSFDISUPatch, PSFLinkPatch
from ..molecule.residue import Residue, ResidueList
from ..objs.link import Link, LinkList
from ..objs.patch import Patch, PatchList
from ..objs.resid import ResID
from ..objs.ssbond import SSBond, SSBondList
from ..util.spinner_wrapper import with_spinner
logger = logging.getLogger(__name__)
[docs]
def get_toppar_from_psf(filename: str):
"""
Extracts the topology file names from a PSF file.
This function reads the PSF file and looks for lines that contain topology information,
specifically those that start with ' REMARKS topology'. It collects the unique topology file names
and returns them as a list. Such lines are written by the psfgen program, so this function expects
to operate on a PSF file generated by psfgen.
"""
lines=[]
with open(filename,'r') as f:
for line in f:
if '!NATOM' in line:
break
lines.append(line)
toppars=[]
for l in lines:
if l[0:17]==' REMARKS topology':
tokens=l.split()
top=tokens[2]
if 'toppar_' in top:
fn=os.path.basename(top)
toppars.append(fn)
return list(set(toppars))
[docs]
@dataclass
class PSFResidue:
resname: str = ''
resid: ResID = field(default_factory=ResID)
segname: str = ''
atoms: PSFAtomList = field(default_factory=PSFAtomList)
segtype: str = ''
[docs]
def add_atom(self, atom: PSFAtom):
if self.resid == atom.resid and self.segname == atom.segname:
self.atoms.append(atom)
return True
return False
[docs]
class PSFResidueList(UserList[PSFResidue]):
@classmethod
def _from_residuegrouped_atomlist(cls, atoms: PSFAtomList):
residues = PSFResidueList()
for atom in atoms.data:
current_residue = None if len(residues) == 0 else residues[-1]
if not current_residue or not current_residue.add_atom(atom):
new_residue = PSFResidue(
resname=atom.resname,
resid=atom.resid,
segname=atom.segname,
segtype=atom.segtype,
atoms=PSFAtomList([atom])
)
residues.append(new_residue)
return residues
[docs]
def apply_segtypes(self):
"""
Apply segment types to residues based on their residue names.
This method uses the :attr:`~pestifer.core.labels.Labels.segtype_of_resname` mapping to assign segment types to
residues based on their residue names. It updates the :attr:`~pestifer.molecule.residue.Residue.segtype` attribute of each residue
in the residue list.
"""
# logger.debug(f'residuelist:apply_segtypes {segtype_of_resname}')
for residue in self.data:
residue.segtype = Labels.segtype_of_resname.get(residue.resname, 'unknown')
[docs]
@dataclass
class PSFSegment:
segname: str = ''
residues: PSFResidueList = field(default_factory=PSFResidueList)
commands: list[str] = field(default_factory=list)
segtype: str = ''
[docs]
def add_residue(self, residue: Residue):
if self.segname == residue.segname:
self.residues.append(residue)
return True
return False
[docs]
def validate(self):
# ensure that in this segment there are no repeated resid's; if so, raise a ValueError
# ensure that all atoms and residues have the same segtype; if not, raise a ValueError
resid_set = set()
for residue in self.residues:
if residue.resid in resid_set:
raise ValueError(f"Duplicate resid found in segname {self.segname}: {residue.resid}")
resid_set.add(residue.resid)
if residue.segtype != self.segtype:
raise ValueError(f"Residue {residue.resid} in segname {self.segname} has segtype {residue.segtype}, expected {self.segtype}")
for atom in residue.atoms.data:
if atom.segtype != self.segtype:
raise ValueError(f"Atom {atom.serial} in resid {residue.resid} in segname {self.segname} has segtype {atom.segtype}, expected {self.segtype}")
[docs]
class PSFSegmentList(UserList[PSFSegment]):
@classmethod
def _from_atom_list(cls, atoms: PSFAtomList):
segments = PSFSegmentList()
for atom in atoms.data:
# logger.debug(f'Considering atom {atom.serial} in seg {atom.segname} resid {atom.resid}...')
existing_segment = next((s for s in segments if s.segname == atom.segname), None)
if existing_segment:
# see if residue exists
existing_residue = next((r for r in existing_segment.residues if r.resid == atom.resid), None)
if existing_residue:
existing_residue.atoms.append(atom)
else:
new_residue = PSFResidue(
resname=atom.resname,
resid=atom.resid,
segname=atom.segname,
atoms=PSFAtomList([atom]),
segtype=Labels.segtype_of_resname[atom.resname]
)
existing_segment.residues.append(new_residue)
else:
new_segment = PSFSegment(
segname=atom.segname,
segtype=atom.segtype,
residues=PSFResidueList([PSFResidue(
resname=atom.resname,
resid=atom.resid,
segname=atom.segname,
atoms=PSFAtomList([atom]),
segtype=Labels.segtype_of_resname[atom.resname]
)])
)
segments.append(new_segment)
return segments
@classmethod
def _from_segmentgrouped_residuelist(cls, residues: PSFResidueList):
segments = PSFSegmentList()
for residue in residues.data:
current_segment = None if len(segments) == 0 else segments[-1]
if current_segment and current_segment.add_residue(residue):
continue
new_segment = PSFSegment(
segname=residue.segname,
segtype=residue.segtype,
residues=PSFResidueList([residue])
)
segments.append(new_segment)
return segments
[docs]
def validate(self):
for segment in self.data:
segment.validate()
[docs]
def num_residues(self):
return sum(len(segment.residues) for segment in self.data)
[docs]
def num_atoms(self):
return sum(len(residue.atoms) for segment in self.data for residue in segment.residues)
[docs]
class PSFContents:
"""
A class for parsing and managing PSF topology files.
This class reads a PSF file, extracts topology information such as atoms, bonds, angles,
dihedrals, and patches, and provides methods for accessing and manipulating this data.
Attributes
----------
atoms : PSFAtomList
A list of atoms parsed from the PSF file, represented as instances of the :class:`PSFAtom` class.
atomserials : list
A list of serial numbers corresponding to the atoms in the PSF file.
ssbonds : SSBondList
A list of disulfide bonds parsed from the PSF file, represented as instances of the :class:`SSBond <pestifer.objs.ssbond.SSBond>` class. These appear in psfgen-generated PSF files as patches of type 'DISU'.
links : LinkList
A list of links parsed from the PSF file, represented as instances of the :class:`Link <pestifer.objs.link.Link>` class. These appear in psfgen-generated PSF files as patches recognized as creating covalent bonds between atoms in two different residues OTHER than disulfides.
patches : dict
A dictionary containing patches defined in the PSF file, where keys are patch types (e.g., 'NTER', 'CTER') and values are lists of patch definitions.
bonds : PSFBondList, optional
A list of bonds parsed from the PSF file, represented as instances of the :class:`PSFBond <.psfbond.PSFBond>` class.
This attribute is only set if the ``parse_topology`` parameter includes ``bonds``.
angles : PSFAngleList, optional
A list of angles parsed from the PSF file, represented as instances of the :class:`PSFAngle <.psfangle.PSFAngle>` class.
This attribute is only set if the ``parse_topology`` parameter includes ``angles``.
dihedrals : PSFDihedralList, optional
A list of dihedrals parsed from the PSF file, represented as instances of the :class:`PSFDihedral <.psfdihedral.PSFDihedral>` class.
This attribute is only set if the ``parse_topology`` parameter includes ``dihedrals``.
pairex : PSFPairExList, optional
A list of cross-topology non-bonded pair exclusions parsed from the PSF file, represented as instances of the :class:`PSFPairEx <.psfpairex.PSFPairEx>` class.
Used in the dual-topology alchemical free energy paradigm to suppress interactions between atoms belonging to opposite end-states.
This attribute is only set if the ``parse_topology`` parameter includes ``pairex``.
Parameters
----------
filename : str
The path to the PSF file to be read.
topology_segtypes : list, optional
A list of segment types to include in the topology parsing. If provided, only atoms of these segment types will be included in the topology.
Default is an empty list, which means all atoms will be included.
parse_topology : list, optional
A list of topology elements to parse from the PSF file. Possible values are 'bonds', 'angles', 'dihedrals', 'impropers', and 'pairex'.
If provided, the class will parse the specified topology elements from the PSF file.
Default is an empty list, which means no topology elements will be parsed.
"""
def __init__(self, filename: str, topology_segtypes: list[str] = [], parse_topology: list[str] = []):
with open(filename,'r') as f:
psflines = f.read().split('\n')
logger.debug(f'{filename}: {len(psflines)} lines.')
self.token_idx = {}
self.token_count = {}
self.patches = {}
# scan for important locations in the file
for i, l in enumerate(psflines):
toktst = [x.strip() for x in l.split()]
if len(toktst) >= 2 and toktst[1][0] == '!':
token_name = toktst[1][2:]
if token_name[-1] == ':':
token_name = token_name[:-1]
self.token_idx[token_name] = i
self.token_count[token_name] = int(toktst[0])
self._psf_header = psflines[0]
self._section_headers = {k: psflines[v] for k, v in self.token_idx.items()}
self.token_lines = {}
for (k, l0), l1 in zip(self.token_idx.items(), list(self.token_idx.values())[1:] + [len(psflines)]):
fl = l0 + 1
ll = l1 - 1
self.token_lines[k] = psflines[fl:ll]
logger.debug(f'{len(self.token_lines)} tokensets:')
logger.debug(f'{", ".join([x for x in self.token_lines.keys()])}')
self.remarks = PSFRemarkList([PSFRemark.from_remarkline(x) for x in self.token_lines.get('TITLE', [])])
logger.debug(f'{len(self.remarks)} remarks')
for r in self.remarks:
logger.debug(f'Remark: {r.remarkline} data type {type(r.data)}')
self.atoms = PSFAtomList([PSFAtom(x) for x in self.token_lines['ATOM']])
self.residues = PSFResidueList._from_residuegrouped_atomlist(self.atoms)
self.segments = PSFSegmentList._from_segmentgrouped_residuelist(self.residues)
self.segmentremarks = self.remarks.get_segmentremarks()
self.segments.remarkify(self.segmentremarks)
self.segments.validate()
self.patchremarks = self.remarks.get_patchremarks()
self.ssbonds = SSBondList([SSBond(p.data) for p in self.patchremarks if isinstance(p.data, PSFDISUPatch)])
self.links = LinkList([Link(p.data) for p in self.patchremarks if isinstance(p.data, PSFLinkPatch)])
# Terminus-management patches (and their undo counterparts) are always handled
# by psfgen's first/last segment directives or the intra-segmental loop-gap
# mechanism. Re-propagating them from a continuation PSF causes duplicate bonds
# (e.g., XCTR applied to a residue whose C=O was never removed by CTER in the
# new build adds a second C-O bond, producing a degenerate angle C-O-C).
_terminus_skip = (set(Patch._in_segment_Npatches) |
set(Patch._in_segment_Cpatches) |
{'XCTR', 'XNTR', 'XGLP', 'XPRP'})
self.generic_patches = PatchList([])
for p in self.patchremarks:
if p.data is not None:
continue
if len(p.patchspecs) != 1:
logger.debug(f'Skipping multi-spec generic patch {p.patchname} {p.patchspecs}')
continue
if p.patchname in _terminus_skip:
logger.debug(f'Skipping terminus-management patch {p.patchname} from generic_patches')
continue
spec = p.patchspecs[0]
try:
segname, resid_str = spec.split(':')
except ValueError:
logger.warning(f'Cannot parse patch spec {spec!r} for patch {p.patchname}')
continue
use_after_regenerate = p.patchname in Patch._after_regenerate_patches
self.generic_patches.append(Patch(
patchname=p.patchname,
chainID=segname,
resid=ResID(resid_str),
use_in_segment='',
use_after_regenerate=use_after_regenerate,
))
self.segnames = [seg.segname for seg in self.segments.data]
self.atomserials = [x.serial for x in self.atoms.data]
logger.debug(f'{len(self.atoms)} atoms')
logger.debug(f'{len(self.segments)} segments: {self.segnames}')
logger.debug(f'{len(self.ssbonds)} disulfide bonds')
logger.debug(f'{len(self.links)} special covalent links')
logger.debug(f'{len(self.generic_patches)} generic single-residue patches')
if parse_topology:
include_serials = []
if topology_segtypes:
self.included_atoms = PSFAtomList([])
for segtype in topology_segtypes:
sublist = self.atoms.filter(lambda x: x.segtype == segtype)
logger.debug(f'{len(sublist)} atoms of segtype {segtype}...')
self.included_atoms.extend(sublist)
include_serials = [x.segtype in topology_segtypes for x in self.atoms.data]
logger.debug(f'Including {include_serials.count(True)}/{len(include_serials)} topologically active atoms ({len(self.included_atoms)}) from segtypes {topology_segtypes}')
if 'bonds' in parse_topology:
self.bonds = PSFBondList(LineList(self.token_lines['BOND']),include_serials=include_serials)
logger.debug(f'Creating graph from {len(self.bonds)} bonds...')
self.G = self.bonds.to_graph()
logger.debug(f'extending atom instances with ligands...')
self.add_ligands()
logger.debug(f'done')
logger.debug(f'Parsed {len(self.bonds)} bonds.')
if 'angles' in parse_topology:
self.angles = PSFAngleList(LineList(self.token_lines['THETA']),include_serials=include_serials)
if 'dihedrals' in parse_topology:
self.dihedrals = PSFDihedralList(LineList(self.token_lines['PHI']),include_serials=include_serials)
if 'impropers' in parse_topology:
self.dihedrals = PSFDihedralList(LineList(self.token_lines['IMPHI']),include_serials=include_serials)
if 'pairex' in parse_topology:
if self.token_count.get('NB', 0) > 0:
self.pairex = PSFPairExList(LineList(self.token_lines['NB']), include_serials=include_serials)
else:
self.pairex = PSFPairExList([])
logger.debug(f'Parsed {len(self.pairex)} non-bonded pair exclusions.')
[docs]
def apply_atom_logics(self, inclusion_logics: list[str] = [], exclusion_logics: list[str] = []):
"""
Apply inclusion and exclusion logic to the atoms in the PSF contents.
Parameters
----------
inclusion_logics : list[str], optional
A list of inclusion logic strings to apply to the atoms.
exclusion_logics : list[str], optional
A list of exclusion logic strings to apply to the atoms.
"""
psf_inclusion_logics = []
psf_exclusion_logics = []
for i in inclusion_logics:
psf_inclusion_logics.append(i.replace('chainID', 'segname'))
for e in exclusion_logics:
psf_exclusion_logics.append(e.replace('chainID', 'segname'))
ignored_atom_count = self.atoms.apply_inclusion_logics(psf_inclusion_logics)
ignored_atom_count += self.atoms.apply_exclusion_logics(psf_exclusion_logics)
if ignored_atom_count > 0:
logger.debug(f'Ignored {ignored_atom_count} atoms from PSFContents based on atom logic')
logger.debug(f' -> Remaining atoms: {len(self.atoms)}')
self.segments = PSFSegmentList._from_atom_list(self.atoms)
logger.debug(f' -> New segments: {[x.segname for x in self.segments]}')
self.segmentremarks = self.remarks.get_segmentremarks()
self.segments.remarkify(self.segmentremarks)
self.segments.validate()
return ignored_atom_count
[docs]
def remove_ignored_residues(self, ignored_residues: ResidueList):
"""
Remove ignored residues from the PSF contents.
Parameters
----------
ignored_residues : ResidueList
A list of residues to remove from the PSF contents.
"""
number_atoms_ignored = 0
for residue in ignored_residues:
for atom in residue.atoms.data:
serial = atom.serial
ignored_atom = self.atoms.get(lambda x: x.serial == serial)
try:
self.atoms.remove(ignored_atom)
number_atoms_ignored += 1
except:
raise(ValueError(f'Could not remove ignored atom {ignored_atom}'))
if number_atoms_ignored > 0:
logger.debug(f'Ignored {number_atoms_ignored} atoms from PSFContents based modifications to PDB-derived residue list')
self.segments = PSFSegmentList._from_atom_list(self.atoms)
self.segmentremarks = self.remarks.get_segmentremarks()
self.segments.remarkify(self.segmentremarks)
self.segments.validate()
[docs]
def add_ligands(self):
"""
Add ligands to each atom based on the bonds defined in the PSF file.
This method iterates through the atoms and bonds, establishing ligand relationships
by adding each bonded atom to the ligand list of the corresponding atom.
It ensures that each atom's ligands are correctly populated based on the bonds defined in the PSF file.
Raises
------
AssertionError
If the `bonds` attribute is not set or if the atom serial numbers do not match the expected indices.
"""
assert hasattr(self, 'bonds')
for i, a in enumerate(self.atoms.data):
a.ligands = []
assert a.serial == i + 1
for b in self.bonds.data:
i, j = b.serial1, b.serial2
aix, ajx = i - 1, j - 1
ai, aj = self.atoms[aix], self.atoms[ajx]
ai.add_ligand(aj)
aj.add_ligand(ai)
[docs]
def get_charge(self):
"""
Calculate the total charge of the system by summing the charges of all atoms.
Returns
-------
float
The total charge of the system, calculated as the sum of the charges of all atoms in the PSF file.
"""
return np.sum([x.charge for x in self.atoms])
@staticmethod
def _atom_to_psf_line(a: 'PSFAtom') -> str:
"""Format a PSFAtom as an EXT PSF atom line."""
resid_str = str(a.resid)
return (f'{a.serial:10d} {a.segname:<8s} {resid_str:<8s} '
f'{a.resname:<8s} {a.atomname:<8s} {a.atomtype:<8s}'
f'{a.charge:9.6f}{a.atomicwt:14.4f}{0:12d}')
[docs]
def write(self, filename: str):
"""
Write the PSF contents to a file in EXT PSF format.
The ATOM section is rebuilt from the current :attr:`atoms` list; all other
sections are written verbatim from the lines read at parse time.
Parameters
----------
filename : str
Path of the output PSF file.
"""
with open(filename, 'w') as f:
f.write(self._psf_header + '\n')
f.write('\n')
for token_name in self.token_idx:
if token_name == 'ATOM':
header = f'{len(self.atoms):10d} !NATOM'
content = [self._atom_to_psf_line(a) for a in self.atoms.data]
else:
header = self._section_headers[token_name]
content = self.token_lines.get(token_name, [])
f.write(header + '\n')
for line in content:
f.write(line + '\n')
f.write('\n')