Source code for pestifer.core.command

# Author: Cameron F. Abrams, <cfa22@drexel.edu>
#
"""
Class for handling and running external commands and managing their output.
"""

import atexit
import logging
import os
import signal
import shutil
import subprocess

from glob import glob

from ..logparsers import LogParser
from ..util.util import running_under_pytest

logger = logging.getLogger(__name__)

[docs] class Command: """ Class for running external commands in a subprocess. This class allows you to create a command with its arguments and options, and then run it while capturing its output. The command is run in a shell, and you can specify a logfile to write the output to. If the command returns a non-zero exit code, an error is logged, and the stdout and stderr buffers are printed. You can also specify a tuple of (needle, message) to override the default behavior and log a custom message if the needle is found in the stdout or stderr. The command can be run with a progress bar and elapsed time display using a LogParser instance. The command can also be run quietly, suppressing the output to the console. """ divider_line_length = 55 """ The length of the divider line used in logging output to separate sections of the log. """ def __init__(self, command: str, *args, **options): """ Initializes a Command instance with a command, its arguments, and options. Parameters ---------- command : str The command to be executed. args : tuple A tuple of arguments to be passed to the command. options : dict A dictionary of options to be passed to the command, where keys are option names and values are option values. """ self.command = command self.args = args self.options = options self.c = f'{self.command} ' + ' '.join(args) + ' '.join([f'-{k} {v}' for k, v in self.options.items()]) self.stdout = '' self.stderr = ''
[docs] def run(self, logfile=None, override=(), ignore_codes=[], quiet=True, logparser: LogParser = None, log_stderr=False, **kwargs): """ Runs this Command instance Parameters ---------- logfile : str name of log file to write process' stdout and stderr. If None (default), stdout and stderr are retained only in this instance's stdout and stderr attributes. override : tuple tuple composed of a "needle" and a "message". If the needle is found in the stdout or stderr of the process, the message is displayed and an error is thrown, halting the program. ignore_codes : list a list of integer exit codes that are ignored in addition to 0 quiet : bool if True, suppresses all output from the command logparser: LogParser used for progress bar/elapsed time displays and log parsing """ _pytest = running_under_pytest() if not quiet: logger.debug(f'{self.c}') log = None if not logfile: if not quiet: logger.debug(f'No logfile specified for {self.c}') else: if os.path.exists(logfile) and not kwargs.get('overwrite_logs', False): nlogs = len(glob(f'%{logfile}')) shutil.move(logfile, f'%{logfile}-{nlogs+1}%') logger.debug(f'Rotating {logfile} to %{logfile}-{nlogs+1}%') log = open(logfile, 'w') logger.debug(f'Opened {logfile} for writing') if log_stderr: stderr_redirect = subprocess.STDOUT else: stderr_redirect = subprocess.PIPE process = subprocess.Popen(self.c, shell=True, stdout=subprocess.PIPE, stderr=stderr_redirect, text=True) global pid pid = process.pid def kill_child(): if pid is None: pass else: try: os.kill(pid, signal.SIGTERM) except: pass atexit.register(kill_child) self.stdout = '' self.stderr = '' output = '' while True: output = process.stdout.readline() self.stdout += output if logparser: logparser.update(output) if not _pytest: logparser.update_progress_bar() if log: log.write(output) log.flush() if output == '' and process.poll() is not None: break if hasattr(logparser, 'progress_bar') and logparser.progress_bar is not None: if not _pytest: logparser.progress_bar.finish() else: print() if logfile: logger.debug(f'Log written to {logfile}') log.close() remaining_stdout, self.stderr = process.communicate() self.stdout += remaining_stdout if logparser: logparser.update(remaining_stdout) if not _pytest and not (hasattr(logparser, 'progress_bar') and logparser.progress_bar is not None): logparser.update_progress_bar() if hasattr(logparser, 'finalize'): logparser.finalize() if hasattr(logparser, 'write_csv'): logparser.write_csv() if process.returncode != 0 and not process.returncode in ignore_codes: logger.error(f'Returncode: {process.returncode}') if len(self.stdout) > 0: logger.error('stdout buffer follows\n' + '*' * self.divider_line_length + '\n' + self.stdout + '\n' + '*' * self.divider_line_length) if len(self.stderr) > 0: logger.error('stderr buffer follows\n' + '*' * self.divider_line_length + '\n' + self.stderr + '\n' + '*' * self.divider_line_length) return process.returncode if len(override) == 2: needle, msg = override if needle in self.stdout or needle in self.stderr: logger.info(f'Returncode: {process.returncode}, but another error was detected:') logger.error(msg) if len(self.stdout) > 0 and needle in self.stdout: logger.error('stdout buffer follows\n' + '*' * self.divider_line_length + '\n' + self.stdout + '\n' + '*' * self.divider_line_length) if len(self.stderr) > 0 and needle in self.stderr: logger.error('stderr buffer follows\n' + '*' * self.divider_line_length + '\n' + self.stderr + '\n' + '*' * self.divider_line_length) return 0