# Author: Cameron F. Abrams, <cfa22@drexel.edu>
"""
A class for building the asymmetric unit from a PDB file
"""
from __future__ import annotations
import logging
from argparse import Namespace
from mmcif.api.PdbxContainers import DataContainer
from pathlib import Path
from pidibble.pdbrecord import PDBRecordDict
from typing import ClassVar, TYPE_CHECKING
from .atom import AtomList, Atom, Hetatm
from .chainidmanager import ChainIDManager
from .residue import Residue, ResidueList, ResiduePlaceholderList
from .segment import SegmentList
from ..core.objmanager import ObjManager
from ..objs.graft import GraftList
from ..objs.link import LinkList
from ..objs.mutation import Mutation, MutationList
from ..objs.patch import PatchList
from ..objs.seqadv import SeqadvList
from ..objs.ssbond import SSBondList
from ..objs.ter import TerList
from ..psfutil.psfcontents import PSFContents
from ..util.util import write_residue_map
from ..util.stringthings import parse_filter_expression
if TYPE_CHECKING:
from .molecule import Molecule
logger = logging.getLogger(__name__)
[docs]
class AsymmetricUnit:
"""
A class for building the asymmetric unit from a PDB/mmCIF file.
Parameters
----------
parsed : PDBRecordDict or DataContainer
The parsed PDB or mmCIF data. A PDBRecordDict is returned by `pidibble.pdbparse.PDBParser(spec).parse().parsed`, and a DataContainer is returned by `~pestifer.util.cifutil.CIFload(filepath)`.
sourcespecs : dict, optional
Specifications for the source of the data, including exclusions and sequence specifications.
objmanager : ObjManager, optional
An object manager to handle the objects in the asymmetric unit. Defaults to a new instance of `ObjManager`.
chainIDmanager : ChainIDManager, optional
A manager for chain IDs. Defaults to a new instance of `ChainIDManager`.
psf : str or Path, optional
Path to a PSF file to use for additional information about the structure. Defaults to None. If set, it must be congruent to the PDB/mmCIF data.
"""
_residue_excludables: ClassVar[dict] = {'resnames':'resname','chains':'chainID','segtypes':'segtype'}
"""
Attributes that can be excluded from the asymmetric unit based on user specifications.
Key is the YAML key used the input file and value is the attribute of the residue to check against.
"""
_atom_excludables: ClassVar[dict] ={'altlocs':'altloc'}
"""
Attributes that can be excluded from the asymmetric unit based on user specifications.
Key is the YAML key used the input file and value is the attribute of the atom to check against.
"""
[docs]
def describe(self):
return f'<AsymmetricUnit: {len(self.atoms)} atoms, {len(self.residues)} residues>'
def __repr__(self):
return self.describe()
def __init__(self,
parsed: PDBRecordDict | DataContainer = None,
sourcespecs: dict = {},
objmanager: ObjManager = ObjManager(),
chainIDmanager: ChainIDManager = ChainIDManager(),
psf: str | Path = None):
self.parent_molecule: Molecule = None
missings = ResiduePlaceholderList([])
ssbonds = SSBondList([])
links = LinkList([])
ters = TerList([])
seqadvs = SeqadvList([])
patches = PatchList([])
grafts = GraftList([])
# These will be built below
self.atoms = AtomList([])
self.residues = ResidueList([])
self.segments = SegmentList([])
self.objmanager = objmanager
self.chainIDmanager = chainIDmanager
self.psfcontents = PSFContents(psf) if psf else None
self.ignored = Namespace()
self.pruned = Namespace()
atom_include_logic = sourcespecs.get('include', [])
atom_exclude_logic = sourcespecs.get('exclude', [])
if len(atom_exclude_logic) > 0 and len(atom_include_logic) > 0:
raise ValueError('Cannot specify both include and exclude logic for atoms')
if not parsed:
# return an empty unconstructed asymmetric unit
return
if isinstance(parsed, PDBRecordDict): # PDB format
# minimal parsed has ATOMS
model_id = sourcespecs.get('model', None)
atoms = AtomList.from_pdb(parsed, model_id=model_id)
ters = TerList.from_pdb(parsed, model_id=model_id)
if len(atoms) == 0:
raise ValueError(f'No ATOM/HETATM records found in parsed data')
if sourcespecs.get('reserialize', False) or self.psfcontents is not None:
atoms.reserialize()
else:
atoms.sort(by=['serial'])
if ters.has_serials(): atoms.reserialize()
objmanager.ingest(ters)
missings = ResiduePlaceholderList.from_pdb(parsed)
ssbonds = SSBondList.from_pdb(parsed)
seqadvs = SeqadvList.from_pdb(parsed)
logger.debug(f'Parsed {len(atoms)} atoms, {len(missings)} missing residues, {len(ssbonds)} ssbonds, {len(seqadvs)} seqadvs, {len(ters)} ters')
links = LinkList.from_pdb(parsed)
links.remove_duplicates(fields=['chainID1', 'resid1', 'chainID2', 'resid2']) # some pdb files list links multiple times (2ins, i'm looking at you)
elif isinstance(parsed, DataContainer): # mmCIF format
atoms = AtomList.from_cif(parsed)
seqadvs = SeqadvList.from_cif(parsed)
missings = ResiduePlaceholderList.from_cif(parsed)
ssbonds = SSBondList.from_cif(parsed)
links = LinkList.from_cif(parsed)
else:
raise TypeError(f'Cannot construct Asymmetric Unit from data of type {type(parsed)}; expected PDBRecordDict or DataContainer')
uniq_attr = atoms.uniqattrs(['chainID'])
logger.debug(f'ChainIDs in structure: {uniq_attr["chainID"]}')
logger.debug(f'Samples of first atoms in each chainID:')
for chainID in uniq_attr["chainID"]:
first_atom = atoms.get(lambda x: x.chainID == chainID)[0]
logger.debug(f' {chainID}: {first_atom.resname} {first_atom.resid.resid}')
if self.psfcontents is not None:
assert len(self.psfcontents.atoms) == len(atoms), f'Error: psf file {psf} has wrong number of atoms {len(self.psfcontents.atoms)}, expected {len(atoms)}'
logger.debug(f'PSF defines {len(self.psfcontents.atoms)} atoms')
logger.debug(f'PSF defines {len(self.psfcontents.segments)} segments: {self.psfcontents.segnames}')
logger.debug(f'Samples of first atom in each segment:')
for segname in self.psfcontents.segnames:
first_atom = self.psfcontents.atoms.get(lambda x: x.segname == segname)[0]
logger.debug(f' {segname}: {first_atom.resname} {first_atom.resid.resid}')
atoms.apply_psf_attributes(self.psfcontents.atoms)
# note we expect that there are NO ssbonds or links in the pdb file
# because it is not a Structure File from the PDB (since it has a companion PSF)
ssbonds.extend(self.psfcontents.ssbonds)
links.extend(self.psfcontents.links)
patches.extend(self.psfcontents.generic_patches)
if len(self.psfcontents.ssbonds) > 0:
logger.debug(f'PSF file {psf} identifies {len(self.psfcontents.ssbonds)} ssbonds; total ssbonds now {len(ssbonds)}')
if len(self.psfcontents.links) > 0:
logger.debug(f'PSF file {psf} identifies {len(self.psfcontents.links)} links; total links now {len(links)}')
if len(self.psfcontents.generic_patches) > 0:
logger.debug(f'PSF file {psf} identifies {len(self.psfcontents.generic_patches)} generic patches')
# at this point the objmanager is holding all mods that are in
# the input file but NOT in the PDB/mmCIF/psf file.
seqmods = objmanager.get('seq', {})
for k,v in seqmods.items():
if k != 'grafts':
if len(v) > 0:
logger.debug(f'Seqmod {k}')
for mod in v:
logger.debug(f' {str(mod)}')
topomods = objmanager.get('topol', {})
grafts: GraftList = seqmods.get('grafts', GraftList([]))
userlinks = topomods.get('links', LinkList([]))
links.extend(userlinks)
userssbonds = topomods.get('ssbonds', SSBondList([]))
for u in userssbonds:
logger.debug(f'Adding user-specified ssbond {u}')
ssbonds.extend(userssbonds)
userpatches = seqmods.get('patches', PatchList([]))
for u in userpatches:
logger.debug(f'Adding user-specified patch {u}')
patches.extend(userpatches)
# Build the list of residues
ignored_atom_count = 0
ignored_missing_residue_count = 0
if atom_include_logic or atom_exclude_logic:
logger.debug(f'Applying atom logics')
if atom_include_logic:
logger.debug(f'Atoms: Applying atom inclusion logic: {atom_include_logic}')
ignored_atom_count += atoms.apply_inclusion_logics(atom_include_logic)
elif atom_exclude_logic:
logger.debug(f'Atoms: Applying atom exclusion logic: {atom_exclude_logic}')
ignored_atom_count += atoms.apply_exclusion_logics(atom_exclude_logic)
logger.debug(f'Ignored {ignored_atom_count} atoms from PDB by inclusion/exclusion logic')
if atom_include_logic:
logger.debug(f'Missing residues: Applying atom inclusion logic: {atom_include_logic}')
ignored_missing_residue_count += missings.apply_inclusion_logics(atom_include_logic)
elif atom_exclude_logic:
logger.debug(f'Missing residues: Applying atom exclusion logic: {atom_exclude_logic}')
ignored_missing_residue_count += missings.apply_exclusion_logics(atom_exclude_logic)
logger.debug(f'Ignored {ignored_missing_residue_count} missing residues from PDB by inclusion/exclusion logic')
if self.psfcontents is not None:
ignored_psfatom_count = self.psfcontents.apply_atom_logics(atom_include_logic, atom_exclude_logic)
assert len(atoms) == len(self.psfcontents.atoms), f'Atom logic is not consistent between PDB and PSF'
assert ignored_psfatom_count == ignored_atom_count, f'Ignored atom count is not consistent between PDB and PSF'
logger.debug(f'Ignored {ignored_psfatom_count} atoms from PSF by inclusion/exclusion logic')
fromAtoms = ResidueList.from_residuegrouped_atomlist(atoms)
fromResiduePlaceholders = ResidueList.from_ResiduePlaceholderlist(missings)
if self.psfcontents is not None:
assert len(fromResiduePlaceholders) == 0, f'Expected no missing residues because you specified a companion PSF, but found {len(fromResiduePlaceholders)}'
residues: ResidueList = fromAtoms + fromResiduePlaceholders
residues.apply_segtypes()
# for r in residues:
# logger.debug(str(r))
# apply seqmods
if 'deletions' in seqmods:
residues.deletion(seqmods['deletions'])
if 'substitutions' in seqmods and len(seqmods['substitutions']) > 0:
logger.debug(f'Applying {len(seqmods["substitutions"])} substitutions...')
new_seqadv, wearegone = residues.substitutions(seqmods['substitutions'])
seqadvs.extend(new_seqadv)
logger.debug(f'There are now {len(seqadvs)} seqadv elements in seqadvs ({type(seqadvs)})')
uniques = residues.uniqattrs(['segtype'], with_counts=True)
nResolved = sum([1 for x in residues if x.resolved])
nUnresolved = sum([1 for x in residues if not x.resolved])
logger.debug(f'{len(residues)} total residues: {nResolved} resolved and {nUnresolved} unresolved')
if 'deletions' in seqmods and len(seqmods['deletions']) > 0:
nResolvedDelete = len(fromAtoms) - nResolved
nUnresolvedDelete = len(fromResiduePlaceholders) - nUnresolved
logger.debug(f'Deletions removed {nResolvedDelete} resolved and {nUnresolvedDelete} unresolved residues')
if 'insertions' in seqmods and len(seqmods['insertions']) > 0:
residues.apply_insertions(seqmods['insertions'])
logger.debug(f'Renumbering residues')
residues.renumber(links)
uniques = residues.uniqattrs(['segtype'], with_counts=True)
logger.debug(f'Segtypes present: {uniques["segtype"]}')
# Delete any residues dictated by user-specified exclusions
# thru_dict = {resattr: excludes.get(yaml, []) for yaml, resattr in self._residue_excludables.items()}
# logger.debug(f'Exclusions: {thru_dict}')
# # delete residues that are in user-specified exclusions
# ignored_residues = residues.prune_exclusions(residues.dict_to_condition(thru_dict))
# populates a specific mapping of PDB chainID to CIF label_asym_id
# This is only meaningful if mmCIF input is used
residues.map_chainIDs_label_to_auth()
ignored_seqadvs = seqadvs.assign_residues(residues)
logger.debug(f'{len(seqadvs)} seqadvs after assign_residues')
# for s in seqadvs:
# logger.debug(f'{str(s)}')
ignored_ssbonds = ssbonds.assign_residues(residues)
logger.debug(f'{len(patches)} patches before assign_residues')
ignored_patches = patches.assign_residues(residues)
logger.debug(f'{len(ssbonds)} ssbonds after assign_residues')
# for b in ssbonds:
# logger.debug(f'{str(b)}')
logger.debug(f'{len(patches)} patches after assign_residues')
# for p in patches:
# logger.debug(f'{str(p)}')
more_ignored_residues, ignored_links = links.assign_residues(residues)
logger.debug(f'{len(links)} links after assign_residues')
# for l in links:
# logger.debug(f'{str(l)}')
ignored_residues = ResidueList([])
ignored_residues.extend(more_ignored_residues)
ignored_grafts = grafts.assign_residues(residues, links)
# ignored_links.extend(new_ignored_links)
total_ignored_residue_count = len(ignored_residues) + ignored_missing_residue_count
if (ignored_atom_count + total_ignored_residue_count + len(ignored_seqadvs) + len(ignored_ssbonds) + len(ignored_links) + len(ignored_grafts)) > 0:
logger.debug(f'Inclusion/exclusion logic results in deletion of:')
logger.debug(f' {ignored_atom_count} atoms, {len(atoms)} remain')
logger.debug(f' {total_ignored_residue_count} residues, {len(residues)} remain')
logger.debug(f' {len(ignored_seqadvs)} seqadvs, {len(seqadvs)} remain')
logger.debug(f' {len(ignored_ssbonds)} ssbonds, {len(ssbonds)} remain')
logger.debug(f' {len(ignored_patches)} patches, {len(patches)} remain')
logger.debug(f' {len(ignored_links)} links, {len(links)} remain')
logger.debug(f' {len(ignored_grafts)} grafts, {len(grafts)} remain')
logger.debug(f'{len(residues)} residues survived parsing')
if self.psfcontents is not None:
logger.debug(f'Removing {len(ignored_residues)} residues from PSFContents because they were ignored by inclusion/exclusion logic')
self.psfcontents.remove_ignored_residues(ignored_residues)
if len(grafts)>0:
logger.debug('Grafts:')
for g in grafts:
logger.debug(str(g))
# provide specifications of how to handle sequence issues
# implied by PDB input
seq_specs = sourcespecs.get('sequence', {})
# Insertions that land beyond the last resolved residue of their chain extend
# the C-terminus — they must be built regardless of include_C_termini.
if 'insertions' in seqmods and len(seqmods['insertions']) > 0:
c_term_ext_chains = []
for ins in seqmods['insertions'].data:
ins_chainID = ins.chainID
ins_resseqnum = ins.resid.resseqnum
resolved_res = [r for r in residues.data if r.chainID == ins_chainID and r.segtype == 'protein' and r.resolved]
if not resolved_res:
continue
max_resolved_resseqnum = max(r.resid.resseqnum for r in resolved_res)
if ins_resseqnum >= max_resolved_resseqnum:
if ins_chainID not in c_term_ext_chains:
c_term_ext_chains.append(ins_chainID)
if c_term_ext_chains:
existing = seq_specs.setdefault('build_zero_occupancy_C_termini', [])
for cid in c_term_ext_chains:
if cid not in existing:
existing.append(cid)
logger.debug(f'C-terminal insertions detected in chains {c_term_ext_chains}; added to build_zero_occupancy_C_termini')
psfcompanion = None
if self.psfcontents:
psfcompanion = self.psfcontents.segments
self.segments.generate_from_residues(residues=residues, seq_spec=seq_specs, chainIDmanager=chainIDmanager, psfcompanion=psfcompanion, links=links)
# this may have altered chainIDs for some residues. So we must
# be sure all mods that are chainID-specific but
# not inheritable by segments are updated
seqadvs.update_attr_from_obj_attr('chainID', 'residue', 'chainID')
seqadvs.map_attr('dbRes', 'dbRes', {'HIS': 'HSD'})
ssbonds.update_attr_from_obj_attr('chainID1', 'residue1', 'chainID')
ssbonds.update_attr_from_obj_attr('chainID2', 'residue2', 'chainID')
links.update_attr_from_obj_attr('chainID1', 'residue1', 'chainID')
links.update_attr_from_obj_attr('chainID2', 'residue2', 'chainID')
for link in links:
if link.residue1 is not None:
link.resid1 = link.residue1.resid
if link.residue2 is not None:
link.resid2 = link.residue2.resid
res_segname_map = self.segments.get_residue_to_segname_map()
for link in links:
if link.residue1 is not None:
link.segname1 = res_segname_map.get(id(link.residue1), link.residue1.chainID)
if link.residue2 is not None:
link.segname2 = res_segname_map.get(id(link.residue2), link.residue2.chainID)
# Set g.chainID and g.graft_segname to the psfgen segname of the segment
# that contains the receiver residue. inherit_objs routes objects by
# x.chainID == segment.segname, so this ensures the graft ends up in the
# glycan segment (e.g. 'GG02'), not in the protein segment (e.g. 'G').
# Donor residues will be written into that same glycan segment in psfgen.
for g in grafts.data:
receiver_segname = res_segname_map.get(id(g.residues[0]), g.residues[0].chainID)
g.chainID = receiver_segname
g.graft_segname = receiver_segname
logger.debug(f'Graft {g.obj_id} donor residues will join segment {g.graft_segname}')
logger.debug(f'Segnames in A.U.: {", ".join(self.segments.segnames)}')
if self.segments.daughters:
logger.debug(f'Daughter chains generated: {self.segments.daughters}')
logger.debug(f'Used chainIDs {chainIDmanager.Used}')
# promote sequence numbers in any grafts to avoid collisions
# and include the graft links in the overall list of links
next_resid = max([x.resid for x in residues]) + 1
for g in grafts.data:
next_resid = g.set_internal_resids(next_resid)
links.extend(g.donor_internal_links)
links.extend(g.donor_external_links)
# Update segnames on graft links: receiver residues look up res_segname_map;
# donor residues (absent from map) use the graft's segname.
for g in grafts.data:
for l in list(g.donor_internal_links.data) + list(g.donor_external_links.data):
if l.residue1 is not None:
l.segname1 = res_segname_map.get(id(l.residue1), g.graft_segname)
if l.residue2 is not None:
l.segname2 = res_segname_map.get(id(l.residue2), g.graft_segname)
# at this point, we have built the asymmetric unit according to the intention of the
# author of the structure AND the intention of the user in excluding certain parts
# of that structure (ligands, ions, chains, etc). At this point, we should apply
# any user-defined modifications
# First, scan all seqadv's for relevant mutations to apply
mutations = MutationList([])
if seq_specs.get('fix_engineered_mutations', False):
mutations.extend(MutationList([Mutation(s) for s in seqadvs if s.typekey == 'engineered mutation']))
if seq_specs.get('fix_conflicts', False):
mutations.extend(MutationList([Mutation(s) for s in seqadvs if s.typekey == 'conflict']))
mutations.extend(MutationList([Mutation(s) for s in seqadvs if s.typekey == 'user']))
# Now append these to the objmanager's mutations
objmanager.ingest(mutations)
mutations = objmanager.get('seq', {}).get('mutations', MutationList([]))
if len(mutations) > 0:
logger.debug(f'All mutations')
mutations.sort(by=['typekey'])
for m in mutations:
logger.debug(str(m) + ' ' + m.typekey)
pruned_objects = self.segments.prune_topology(mutations, links, ssbonds)
# pruned_ssbonds = ssbonds.prune_mutations(mutations)
# pruned = pruned_by_links = links.prune_mutations(mutations, segments)
# logger.debug(f'Pruned objects: {pruned_objects}')
pruned_segments: SegmentList = pruned_objects.get('segments', [])
for s in pruned_segments.data:
if s.segname in chainIDmanager.Used:
logger.debug(f'Unregistering chainID {s.segname} because this entire segment was pruned')
chainIDmanager.unregister_chain(s.segname)
else:
logger.debug(f'Pruned segment {s.segname} (chainID={s.chainID}) is a glycan; no chainID to unregister')
# Now any explicitly deleted ssbonds
topomods = objmanager.get('topol', {})
if 'ssbondsdelete' in topomods and len(topomods['ssbondsdelete']) > 0:
logger.debug(f'SSBonds to delete: {topomods["ssbondsdelete"]} ')
for s in ssbonds:
logger.debug(f'Examining ssbond {s}')
if topomods['ssbondsdelete'].is_deleted(s):
logger.debug(f'Deleting ssbond {s}')
ignored_ssbonds.append(ssbonds.remove_and_return(s))
# finalize the objmanager
for olist in [ssbonds, links, grafts, patches]:
objmanager.ingest(olist, overwrite=True)
self.segments.inherit_objs(objmanager)
self.atoms = atoms
self.residues = residues
self.objmanager = objmanager
self.chainIDmanager = chainIDmanager
self.psf = psf
self.ignored = Namespace(residues=ignored_residues, links=ignored_links, ssbonds=ignored_ssbonds, seqadv=ignored_seqadvs)
self.pruned = Namespace(**pruned_objects)
[docs]
def set_parent_molecule(self, parent_molecule):
"""
Sets the parent molecule of the asymmetric unit.
Parameters
----------
parent_molecule : Any
The parent molecule to set for the asymmetric unit.
"""
if self.parent_molecule is not None:
raise RuntimeError("Parent molecule is already set and cannot be changed.")
self.parent_molecule = parent_molecule
for segment in self.segments:
segment.set_parent_molecule(parent_molecule)
[docs]
def add_segment(self, seg):
"""
Adds a segment to the asymmetric unit.
Parameters
----------
seg : Segment
The segment to add to the asymmetric unit.
"""
self.segments.append(seg)
[docs]
def ingest_grafts(self, grafts: GraftList, residues: ResidueList, links: LinkList):
"""
Ingests grafts into the asymmetric unit.
Parameters
----------
grafts : GraftList
The list of grafts to ingest.
residues : ResidueList
The list of residues in the asymmetric unit.
links : LinkList
The list of links in the asymmetric unit.
"""
for g in grafts.data:
graft_chainID = g.chainID
chain_residues = residues.newfilter(lambda x: x.chainID == graft_chainID)
# last_chain_residue_idx=residues.index(chain_residues[-1])
next_available_resid = max([x.resid for x in chain_residues]) + 1
g_topomods = g.source_molecule.objmanager.get('topol', {})
g_links = g_topomods.get('links', LinkList([]))
g.source_seg = g.source_molecule.asymmetric_unit.segments.get(lambda x: x.segname == g.source_chainID)
# build the list of new residues this graft contributes; the index residue is not included!
g.my_residues = ResidueList([])
for residue in g.source_seg.residues.data:
if residue > f'{g.source_resid1}' and residue <= f'{g.source_resid2}':
residue.set_resid(next_available_resid)
residue.set_chainID(graft_chainID)
next_available_resid += 1
g.my_residues.append(residue)
# only ingest links that are internal to this set of residues
ingested_links = 0
for l in g_links.data:
if l.residue1 in g.my_residues and l.residue2 in g.my_residues:
links.append(l)
ingested_links += 1
logger.debug(f'Chain {graft_chainID} of raw asymmetric unit ingests {len(g.my_residues)} residues and {ingested_links} links from graft {g.id}')
# residues.insert(last_chain_residue_idx+1,r)
# last_chain_residue_idx+=1
[docs]
def set_coords(self, altstruct: PDBRecordDict):
"""
Sets the coordinates of the asymmetric unit from an alternative structure.
Parameters
----------
altstruct : dict
The alternative structure containing atomic coordinates.
"""
atoms = AtomList([Atom(p) for p in altstruct[Atom.PDB_keyword]])
atoms.extend([Hetatm(p) for p in altstruct.get(Hetatm.PDB_keyword, [])])
# ters = TerList([Ter(p) for p in altstruct.get(Ter.PDB_keyword, [])])
atoms.reserialize()
# atoms.adjustSerials(ters)
altRes = ResidueList(atoms)
overwrites = []
for ar in altRes:
tr: Residue = self.residues.get(lambda x: x.resname == ar.resname and x.resid == ar.resid and x.chainID == ar.chainID)
if tr:
tr.atoms.overwrite_positions(ar.atoms)
overwrites.append(ar)
logger.debug(f'set_coords: {len(overwrites)} residues overwritten')