# Author: ChatGPT with modifications by Cameron F. Abrams, <cfa22@drexel.edu>
"""
Pipeline context for managing passing of information from one task to another via artifacts. All Artifact
types are defined in `~pestifer.core.artifacts`.
The main task of the pipeline is the _registration_ of Artifacts. This is done primarily via :meth:`PipelineContext.register`.
This method requires an object, a key, the id of the object requesting the registration, and the type of the Artifact.
"""
from __future__ import annotations
import logging
from .artifacts import Artifact, FileArtifact, ArtifactDict, ArtifactList, FileArtifactDict, FileArtifactList, StateArtifacts
from ..util.stringthings import my_logger
logger = logging.getLogger(__name__)
[docs]
class PipelineContext:
""" Context for managing the pipeline of artifacts by which tasks communicate.
Attributes
----------
head : ArtifactDict
Current artifacts organized into to a dict by their keys
history : ArtifactList
All non-current artifacts in chronological order by their creation
controller_index : int
Index of the controller that owns this pipeline
"""
def __init__(self, controller_index: int = 0):
self.head: ArtifactDict = ArtifactDict(key='Head')
self.history: ArtifactList = ArtifactList(key='History')
self.controller_index = controller_index
def __repr__(self):
return f"PipelineContext(controller_index={self.controller_index})"
[docs]
def register(self, data: object, key: str, requestor: object, artifact_type: type = Artifact | ArtifactList | ArtifactDict, **kwargs) -> Artifact:
"""
Artifact registrar. If an artifact with the requested key already exists, and the data is the same, the existing artifact
is stamped with the requestor and returned. If the data is different, the existing artifact is moved to history and a new artifact
is created, stamped with the requestor, registered as the current artifact, and returned.
If no artifact with the requested key exists, a new artifact is created, stamped with the requestor, registered as the current artifact, and returned."""
if key in self.head:
existing_artifact = self.head[key]
if existing_artifact.data == data:
# same data, just stamp and return
existing_artifact.stamp(requestor)
logger.debug(f'Artifact with key {key} already exists with same data; stamped with {requestor}.')
return existing_artifact
else:
# different data, move existing to history and create new artifact
self.history.append(existing_artifact)
logger.debug(f'Artifact with key {key} already exists with different data; moved to history.')
# create new artifact
new_aritifact = artifact_type(data=data, key=key, **kwargs).stamp(requestor)
self.head[key] = new_aritifact
msg = f'Registered \'{key}\' (type {type(new_aritifact)}) from {requestor.__class__.__name__}'
if len(kwargs) > 0:
msg += f' with options {kwargs}'
logger.debug(msg)
return new_aritifact
[docs]
def register_if_exists(self, data: object, requestor: object, artifact_type: type = FileArtifact, **kwargs) -> FileArtifact | None:
"""
Artifact registrar that only registers the artifact if it exists (for file artifacts).
If an artifact with the requested key already exists, and the data is the same, the existing artifact
is stamped with the requestor and returned. If the data is different, the existing artifact is moved to history and a new artifact
is created, stamped with the requestor, registered as the current artifact, and returned.
If no artifact with the requested key exists, a new artifact is created, stamped with the requestor, registered as the current artifact, and returned.
Parameters
----------
data : object
The data for the artifact to register.
key : str
Unique identifier for the artifact.
requestor : object
The object that is requesting the registration of the artifact. This can be used for logging or tracking purposes.
artifact_type : type
The type of artifact to create (default is FileArtifact).
**kwargs : dict
Additional keyword arguments to pass to the artifact constructor.
Returns
-------
FileArtifact | None
The registered artifact if it exists, otherwise None.
"""
fa = artifact_type(data, **kwargs)
if not isinstance(fa, FileArtifact):
raise ValueError(f"Artifact type {artifact_type} is not a FileArtifact; cannot use register_if_exists.")
if fa.exists():
return self.register(data=data, key=fa.key, requestor=requestor, artifact_type=artifact_type, **kwargs)
else:
logger.debug(f'File artifact {fa.name} does not exist; not registering.')
return None
[docs]
def rekey(self, old_key: str, new_key: str):
"""
Change the key of an existing artifact in the head.
Parameters
----------
old_key : str
The current key of the artifact to rekey.
new_key : str
The new key to assign to the artifact.
Raises
------
ValueError
If the old_key does not exist in the head.
"""
if old_key not in self.head:
raise ValueError(f"Cannot rekey artifact: old key '{old_key}' does not exist.")
if new_key in self.head:
existing_artifact = self.head.pop(new_key)
self.history.append(existing_artifact)
artifact = self.head.pop(old_key)
artifact.key = new_key
self.head[new_key] = artifact
logger.debug(f'Rekeyed artifact from {old_key} to {new_key}.')
[docs]
def show_artifacts(self, header: str = 'Current Artifacts'):
"""
Debugging utility to show current and historical artifacts.
"""
logger.debug('*'*72)
logger.debug(header + f" Controller {self.controller_index:02d}")
logger.debug('*'*72)
logger.debug(f'Head:')
self.show_artifact(self.head)
logger.debug(f'History:')
self.show_artifact(self.history)
logger.debug('*'*72)
[docs]
def show_artifact(self, artifact: Artifact, depth = 1, include_id: bool = False):
my_id_str = ''
if include_id:
my_id_str = f' ({id(artifact)})'
if isinstance(artifact, ArtifactList):
logger.debug(f'{" "*depth}- "{artifact.key}" {my_id_str} List with {len(artifact)} items:')
for item in artifact:
self.show_artifact(item, depth + 1)
elif isinstance(artifact, StateArtifacts):
logger.debug(f'{" "*depth}- "{artifact.key}" {my_id_str} StateArtifacts with {len(artifact)} items:')
for key, item in artifact.items():
if item is not None:
self.show_artifact(item, depth + 1)
elif isinstance(artifact, ArtifactDict):
logger.debug(f'{" "*depth}- "{artifact.key}" {my_id_str} Dict with {len(artifact)} items:')
for key, item in artifact.items():
self.show_artifact(item, depth + 1)
elif isinstance(artifact, Artifact):
pytestable_str = ''
if hasattr(artifact, 'pytestable') and artifact.pytestable:
pytestable_str = ' (pytestable)'
logger.debug(f'{" "*depth}- "{artifact.key}" {my_id_str}: data=\'{artifact.data}\' {pytestable_str}')
else:
logger.debug(f'{" "*depth}- "{artifact.key}" {my_id_str}: ***Unknown artifact type: {type(artifact)}***')
[docs]
def bury(self, artifact: Artifact):
"""
Bury an artifact in the history without registering it as a current artifact
Parameters
----------
artifact : Artifact
The artifact to bury.
"""
self.history.append(artifact)
[docs]
def get_current_artifact(self, key: str, **kwargs) -> Artifact | None:
return self.head.get(key, None)
[docs]
def get_current_artifact_data(self, key: str):
"""
Retrieve the data of an artifact by its key.
Parameters
----------
key : str
The key for the artifact to retrieve.
Returns
-------
Any
The value of the artifact, or None if not found.
"""
artifact = self.get_current_artifact(key)
if artifact:
return artifact.data
return None
[docs]
def get_artifact_series_by_key(self, key: str = '') -> ArtifactList | FileArtifactList:
"""
Retrieve a series of artifacts with the same key.
Parameters
----------
key : str
The key for the artifacts to retrieve.
produced_by : object, optional
The task or object that produced the artifacts to retrieve.
Returns
-------
ArtifactList | FileArtifactList
A list of artifacts associated with the given key, in reverse
registration order. If all artifacts filtered are FileArtifacts,
returns a FileArtifactList.
"""
series = ArtifactList([h for h in self.history if h.key == key])
current = self.get_current_artifact(key)
if current:
series.append(current) # most recent at end
if all([isinstance(x, FileArtifact) for x in series]):
return FileArtifactList(series)
return series
[docs]
def get_state_artifact(self, produced_by: object | None = None) -> StateArtifacts | None:
"""
Retrieve the most recent StateArtifacts produced by a specific task or object.
Parameters
----------
produced_by : object | None
The task or object that produced the artifacts to retrieve.
Returns
-------
StateArtifacts | None
The most recent StateArtifacts produced by the specified task, or None if not found.
"""
my_history = self.history
my_current = self.head
if produced_by is not None:
my_history = self.history.filter_by_produced_by(produced_by=produced_by)
my_current = self.head.filter_by_produced_by(produced_by=produced_by)
all_artifacts = my_history + my_current.to_list()
for artifact in reversed(all_artifacts):
if isinstance(artifact, StateArtifacts):
logger.debug(f'Found StateArtifacts produced by {produced_by if produced_by else "any task"}.')
return artifact
logger.debug(f'No StateArtifacts found produced by {produced_by if produced_by else "any task"}.')
return None
[docs]
def get_all_file_artifacts(self, produced_by: object | None = None) -> FileArtifactList:
"""
Retrieve a collection of artifacts produced by a specific task or object.
Parameters
----------
produced_by : object | None
The task or object that produced the artifacts to retrieve.
Returns
-------
FileArtifactList
A list of all file artifacts produced by the specified task.
"""
file_artifacts = FileArtifactList()
if produced_by is None:
# If no produced_by is specified, use all artifacts in history and head
my_history = self.history
my_current = self.head
else:
my_history = self.history.filter_by_produced_by(produced_by=produced_by)
my_current = self.head.filter_by_produced_by(produced_by=produced_by)
all_artifacts = my_history + my_current.to_list()
for artifact in all_artifacts:
if isinstance(artifact, FileArtifact):
file_artifacts.append(artifact)
elif isinstance(artifact, FileArtifactList):
file_artifacts.extend(artifact.data)
elif isinstance(artifact, FileArtifactDict):
file_artifacts.extend(artifact.to_list())
else:
logger.debug(f'Ignoring non-file artifact "{artifact.key}" of type {type(artifact)}')
file_artifacts = file_artifacts.unique_paths()
logger.debug(f'Found {len(file_artifacts)} file artifacts produced by {produced_by if produced_by else "all tasks"}.')
my_logger([x.name for x in file_artifacts], logger.debug, depth=1)
return FileArtifactList([x for x in file_artifacts if x.exists()])
[docs]
def context_to_string(self) -> str:
"""
Report the current artifacts in the pipeline context.
Returns
-------
str
A string representation of the current artifacts in the pipeline context.
"""
return "\n".join([f"{k}: {a.data}" for k, a in self.head.items()])
[docs]
def history_to_string(self) -> str:
"""
Report the history of artifacts in the pipeline context.
Returns
-------
str
A string representation of the history of artifacts in the pipeline context.
"""
return "\n".join([f"{h.key}: {h.data} (produced by {h.produced_by})" for h in self.history])
[docs]
def stash(self, key: str) -> str:
"""
Stash the current artifact under a new key.
Parameters
----------
key : str
The key under which to stash the current artifact.
"""
artifact = self.head.pop(key, None)
if artifact:
self.history.append(artifact)
[docs]
def import_artifacts(self, other: PipelineContext):
"""
Import artifacts from another pipeline context.
"""
logger.debug(f'Importing artifacts from {repr(other)} into {repr(self)}')
for other_artifact_key, other_artifact in other.head.items():
if not other_artifact_key in self.head:
self.head[other_artifact_key] = other_artifact
else:
self.history.append(other_artifact)
self.history.extend(other.history)
self.show_artifacts()