Source code for rtac.ac_functionalities.ta_runner

"""In this module classes to run the target algorithm and observe its output
through the wrapper are implemented."""

from abc import ABC, abstractmethod
from typing import Any
from multiprocessing import Event
import fcntl
import os
import sys
import time
import signal
import importlib
import argparse
from rtac.ac_functionalities.rtac_data import (
    ACMethod,
    Configuration,
    RTACData,
    RTACDatapp
)
from rtac.ac_functionalities.logs import RTACLogs


[docs]def non_block_read(ta_output: bytes, logs: RTACLogs = None) -> str: """ Function for reading `subprocess.PIPE` output without blocking the application until there is output. It checks for output and exits immediately if none is available. Parameters ---------- ta_output : bytes `subprocess.PIPE` output to be read. logs : RTACLogs Object containing loggers and logging functions. Defaults to None. Returns ------- str Either the output as a string or an empty string if no output was available. """ fd = ta_output.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) try: return ta_output.readline() except Exception as e: if logs is not None: logs.general_log(e) return ''
[docs]class AbstractTARunner(ABC): """ Abstract TARunner. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. logs : RTACLogs Object containing loggers and logging functions. core : int Number of the parallel run started on a core. """ @abstractmethod def __init__(self, scenario: argparse.Namespace, logs: RTACLogs, core: int): """ Initializes target algorithm runner, loads and instantiates target algorithm wrapper. """
[docs] @abstractmethod def translate_config(self, config: Configuration) -> Any: """ Convert dictionary representation of the configuration to the format needed by the wrapper to pass to the target algorithm. Parameters ---------- config : Configuration Configuration of parameter values to run the problem instance with. Returns ------- Any New representation of the configuration suitable for the target algorithm. """
[docs] @abstractmethod def start_run(self, instance: str, config: Configuration, rtac_data: RTACData | RTACDatapp) -> None: """ Starts the target algorithm run and populates data necessary for coronation of tournament members. Parameters ---------- instance : str Path to the problem instance to solve. config : Configuration Representation of the configuration. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. Returns ------- None """
[docs] @abstractmethod def check_output(self, ta_output: bytes) -> None: """ Checks the output, if there was any, and declares the instance as solved by the contender if the corresponding marker is present. Parameters ---------- ta_output : bytes Output from `subprocess.PIPE`. Returns ------- None """
[docs] @abstractmethod def check_result(self) -> None: """ If this contender solved the problem instance the rtac data is populated by the resulting information according to the RTAC method scenario.ac. Returns ------- None """
[docs] @abstractmethod def kill_run(self) -> None: """ Terminates this process/ target algorithm run, as well as the other contenders. Several layers of termination are included to ensure termination on different platforms. Returns ------- None """
[docs] @abstractmethod def run(self, instance: str, config: Configuration, rtac_data: RTACData | RTACDatapp) -> None: """ Manages the target algorithm runner functions depending on the state of the run, according to the RTAC method specified in `scenario.ac`. Parameters ---------- instance : str Path to the problem instance to solve. config : Configuration Configuration of parameter values to run the problem instance with. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. Returns ------- None """
[docs]class BaseTARunner(AbstractTARunner): """ Target algorithm runner for ReACTR implementation. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. logs : RTACLogs Object containing loggers and logging functions. core : int Identifier for the parallel run on a specific core. """ def __init__(self, scenario: argparse.Namespace, logs: RTACLogs, core: int) -> None: """ Initializes the target algorithm runner, loads, and instantiates the target algorithm wrapper. """ self.scenario = scenario if self.scenario.gray_box: self.last_check = time.time() self.clock_start = time.time() self.logs = logs self.core = core self.max_float = sys.float_info.max * 1e-100 module = importlib.import_module(scenario.wrapper) name = scenario.wrapper_name self.wrapper = getattr(module, name)()
[docs] def translate_config(self, config: Configuration) -> Any: """ Convert dictionary representation of the configuration to the format needed by the wrapper to pass to the target algorithm. Parameters ---------- config : Configuration Configuration of parameter values to run the problem instance with. Returns ------- Any New representation of the configuration. """ self.config_id = config.id translated_config = self.wrapper.translate_config(config) return translated_config
[docs] def start_run(self, instance: str, config: Any, rtac_data: RTACData | RTACDatapp) -> None: """ Starts the target algorithm run and populates data necessary for coronation of tournament members. Parameters ---------- instance : str Path to the problem instance to solve. config : Any Representation of the configuration. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. Returns ------- None """ self.config = config self.instance = instance self.rtac_data = rtac_data self.rtac_data.substart[self.core] = time.process_time_ns() self.rtac_data.substart_wall[self.core] = time.time() if self.scenario.objective_min: self.om_start = time.time() self.proc, self.proc_cpu_time = \ self.wrapper.start(self.config, self.scenario.timeout, self.instance) self.pid = self.proc.pid self.rtac_data.pids[self.core] = self.pid self.running = True self.rtac_data.status[self.core] = 1 # TARunStatus.running
[docs] def check_output(self, ta_output: bytes) -> None: """ Checks the output, if there was any, and declares the instance as solved by the contender if the corresponding marker is present. Parameters ---------- ta_output : bytes Output from `subprocess.PIPE`. Returns ------- None """ if ta_output != b'': if_solved = \ self.wrapper.check_if_solved(ta_output, non_block_read, self.proc) if if_solved is not None: res, self.time, self.rtac_data.event = if_solved self.rtac_data.ta_res[self.core] = res self.rtac_data.ta_res_time[self.core] = self.time self.rtac_data.status[self.core] = 2 # TARunStatus.finished if self.scenario.gray_box: self.gb_record(ta_output)
[docs] def check_result(self) -> None: """ If this contender solved the problem instance the rtac data is populated by the resulting information. Returns ------- None """ self.subnow = time.process_time_ns() self.rtac_data.ta_rtac_time[self.core] = \ round( (time.process_time_ns() - self.proc_cpu_time) * 10**(-9) + self.time, 2) self.rtac_data.ev.set() self.rtac_data.event = 1 if not self.scenario.objective_min: self.rtac_data.winner.value = self.config_id elif self.rtac_data.ta_res[self.core] <= min(self.rtac_data.ta_res[:]): self.rtac_data.winner.value = self.config_id self.rtac_data.newtime = self.rtac_data.ta_res_time[self.core] self.running = False self.rtac_data.status[self.core] = 2 # TARunStatus.finished
[docs] def kill_run(self) -> None: """ Terminates this process/ target algorithm run, as well as the other contenders. Several layers of termination are included to ensure termination on different platforms. Returns ------- None """ if self.rtac_data.status[self.core] != 2: self.rtac_data.status[self.core] = 3 # TARunStatus.capped self.running = False self.proc.terminate() time.sleep(0.1) if self.proc.poll() is None: self.proc.kill() time.sleep(0.1) if not self.scenario.objective_min: for ii in range(self.scenario.number_cores): if (self.rtac_data.substart[ii] - time.process_time_ns()) * 1e-9 >= \ self.rtac_data.newtime.value and \ ii != self.core: os.kill(self.rtac_data.pids[ii], signal.SIGKILL) time.sleep(0.1) try: os.kill(self.pid, signal.SIGKILL) except Exception as e: self.logs.general_log(e) pass
[docs] def run(self, instance: str, config: Configuration, rtac_data: RTACData | RTACDatapp, sync_event: Event) -> None: """ Manages the target algorithm runner functions depending on the state of the run. Parameters ---------- instance : str Path to the problem instance to solve. config : Configuration Configuration of parameter values to run the problem instance with. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. Returns ------- None """ sync_event.wait() self.start_run(instance, config, rtac_data) while self.running: # Avoid checking output excessively often (causes too much # overhead) time.sleep(0.005) # time.sleep(5e-6) ta_output = non_block_read(self.proc.stdout, self.logs) self.check_output(ta_output) # If result entry is different to default, populate result lists if self.rtac_data.ta_res[self.core] != self.max_float: self.check_result() # If objective minimization scenario if self.scenario.objective_min: # and if time limit is reached, kill this target algorithm run # + 1 sec extra time for the TA to shut down, wrap up and print # the result, since it is important to know it if time.time() - self.om_start >= self.scenario.timeout + 1: self.rtac_data.status[self.core] = 5 # TARunStatus.timeout self.kill_run() # If runtime minimization and one TA run solved instance, kill all # target algorithm runs elif self.rtac_data.event == 1 or self.rtac_data.ev.is_set(): time.sleep(2) self.kill_run()
[docs]class TARunnerpp(BaseTARunner): """ Target algorithm runner for ReACTR++ implementation. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. logs : RTACLogs Object containing loggers and logging functions. core : int Identifier for the parallel run on a specific core. """ def __init__(self, scenario: argparse.Namespace, logs: RTACLogs, core: int): BaseTARunner.__init__(self, scenario, logs, core) self.interim_check_increment = scenario.timeout / 150 self.interim_check_time = time.time()
[docs] def check_output(self, ta_output: bytes) -> None: """ Checks the output, if there was any, and either declares the instance as solved by the contender (if the corresponding marker is present) or outputs intermediate target algorithm output. Parameters ---------- ta_output : bytes Output from `subprocess.PIPE`. Returns ------- None """ if ta_output != b'': if_solved = \ self.wrapper.check_if_solved(ta_output, non_block_read, self.proc) if if_solved is not None: res, self.time, self.rtac_data.event = if_solved self.rtac_data.ta_res[self.core] = res self.rtac_data.ta_res_time[self.core] = self.time self.rtac_data.status[self.core] = 2 # TARunStatus.finished if time.time() - self.interim_check_time \ >= self.interim_check_increment: # reduce frequency self.interim_check_time = time.time() interim = self.wrapper.check_output(ta_output) if interim is not None: self.rtac_data.interim[self.core] = interim if self.scenario.gray_box: self.gb_record(ta_output)
[docs]def gb_record(self, ta_output: bytes) -> None: """ Records runtime output of the target algorithm if there was any new. Parameters ---------- ta_output : bytes Output from `subprocess.PIPE`. Returns ------ None """ now = time.time() elapsed_time = now - self.last_check if elapsed_time >= self.scenario.gb_read_time: rt_feats = self.wrapper.check_output(ta_output) if rt_feats is not None and \ rt_feats != self.rtac_data.RuntimeFeatures[self.core]: self.rtac_data.RuntimeFeatures[self.core] = rt_feats CPUTimeExpended = \ ( time.process_time_ns() - self.rtac_data.substart[self.core] ) * 1e-9 ClockTimeExpended = time.time() - self.clock_start rt_record = {'core': self.core, 'CPUTimeExpended': CPUTimeExpended, 'ClockTimeExpended': ClockTimeExpended, 'rt_feats': rt_feats} if rt_record not in self.rtac_data.rec_data[self.core].values(): self.rtac_data.rec_data[self.core][int(ClockTimeExpended)] = \ rt_record self.last_check = now
[docs]def ta_runner_factory(scenario: argparse.Namespace, logs: RTACLogs, core: int) -> BaseTARunner | TARunnerpp: """ Class factory to return the initialized target algorithm runner class appropriate to the RTAC method specified in `scenario.ac`. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. logs : RTACLogs Object containing loggers and logging functions. core : int Number of the parallel run started on a core. Returns ------- BaseTARunner or TARunnerpp Initialized target algorithm runner object matching the RTAC method of the scenario. """ if scenario.ac in (ACMethod.ReACTR, ACMethod.CPPL): tarunner = BaseTARunner elif scenario.ac == ACMethod.ReACTRpp: tarunner = TARunnerpp if scenario.gray_box: tarunner.gb_record = gb_record return tarunner(scenario, logs, core)
if __name__ == "__main__": pass