Source code for rtac.ac_functionalities.tournament

"""This module contains classes implementing problem instance solving
tournaments by differently configured target algorithms according to the RTAC
method utilized."""

from abc import ABC, abstractmethod
import multiprocessing as mp
import subprocess
import os
import uuid
import time
import signal
from rtac.utils.process_affinity import set_affinity_recursive
from rtac.ac_functionalities.config_gens import DefaultConfigGen
from rtac.ac_functionalities.rtac_data import (
    TournamentStats,
    TARun,
    TARunStatus
)
from rtac.ac_functionalities.ta_runner import BaseTARunner
from rtac.ac_functionalities.rtac_data import (
    Configuration,
    RTACData,
    ACMethod,
    RTACDatapp
)
from rtac.ac_functionalities.logs import RTACLogs
import argparse


[docs]class AbstractTournament(ABC): """ Abstract class for tournaments. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. ta_runner : BaseTARunner Target algorithm runner object. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. logs : RTACLogs Object containing loggers and logging functions. """ def __init__(self, scenario: argparse.Namespace, ta_runner: BaseTARunner, rtac_data: RTACData | RTACDatapp, logs: RTACLogs): """ Initializes the tournament class for ReACTR tournaments. If `self.scenario.baselineperf` is set, the configuration used is the default configuration according to the configuration space definition JSON. """ self.scenario = scenario self.ta_runner_class = ta_runner self.rtac_data = rtac_data self.logs = logs if self.scenario.baselineperf: self.dcg = DefaultConfigGen(self.scenario) if self.scenario.gray_box: self.gb_model = None
[docs] @abstractmethod def start_tournament(self, instance: str, contender_dict: dict[str: Configuration], tourn_nr: int) -> None: """ Sets up tournament data and information and starts the tournament with `scenario.number_cores` configured target algorithms running in parallel, using settings according to the RTAC method. Parameters ---------- instance : str Path to the problem instance to solve. contender_dict : dict[str, Configuration] Dictionary containing configurations to run in the tournament, with configuration IDs as keys and Configuration objects as values. tourn_nr : int Number of the tournament during this RTAC run. Returns ------- None """
[docs] @abstractmethod def watch_tournament(self) -> None: """ Function to observe the tournament and enforce the timelimit scenario.timeout if reached according to the RTAC method used. Returns ------- None """
[docs] def close_tournament(self) -> None: """ Initiates termination of all target algorithm runs. Returns ------- None """ self.rtac_data.ev.set() self.rtac_data.event = 1 print(f'\nClosing tournament Nr. {self.tourn_nr}', f'(Tournament ID: {self.tourn_id})', f'due to timeout ({self.scenario.timeout}s) at ', f'{self.currenttime}s.\n') if self.scenario.objective_min: time.sleep(1) # extra time for TAs to shut down and print results for core in range(self.scenario.number_cores): if self.rtac_data.status[core] not in (2, 3): self.rtac_data.status[core] = 5 self.terminate_run(core, self.rtac_data.process[core])
[docs] def terminate_run(self, core: int, process: subprocess.Popen) -> None: """ Enforces termination of a target algorithm run. Parameters ---------- core : int Index of the process in the list of processes. process : subprocess.Popen Target algorithm run process to terminate. Returns ------- None """ if core not in self.terminated_configs: if self.scenario.verbosity == 2: print('Terminating configuration', self.conf_id_list[core], 'running on core', core, 'in tournament', self.tourn_id, '( tournament Nr.', self.tourn_nr, ').') if self.pid_alive(self.rtac_data.pids[core]): try: os.kill(self.rtac_data.pids[core], signal.SIGKILL) except Exception as e: message = \ f'Tried killing pid {self.rtac_data.pids[core]} - ' \ + str(e) \ + ' - It was run with configuration' + \ f' {self.conf_id_list[core]}' self.logs.general_log(message) if process.is_alive(): process.terminate() process.join() self.terminated_configs.append(core)
[docs] def pid_alive(self, pid) -> None: """Checks if process is till alive using PID. Parameters ---------- pid : int Unique identifier for process. Returns ------- None """ try: os.kill(pid, 0) return True except OSError: return False
[docs]class Tournament(AbstractTournament): """ Tournament class with functions needed for ReACTR method tournaments. """
[docs] def start_tournament(self, instance: str, contender_dict: dict[str: Configuration], tourn_nr: int, cores_start: list[int]) -> None: """ Sets up tournament data and starts the tournament with `scenario.number_cores` configured target algorithms running in parallel according to the ReACTR method. Parameters ---------- instance : str Path to the problem instance to solve. contender_dict : dict[str, Configuration] Dictionary containing configurations to run in the tournament, with configuration IDs as keys and Configuration objects as values. tourn_nr : int Number of the tournament during this RTAC run. cores_start : list[int] List of cores which to start the contenders on. Returns ------- None """ self.terminated_configs = [] self.instance = instance self.tourn_nr = tourn_nr if self.scenario.baselineperf: def_conf = self.dcg.generate() contender_dict = {def_conf.id: def_conf} self.config_list = \ [list(contender_dict.values())[i] if i in cores_start else None for i in range(self.scenario.number_cores)] self.conf_id_list = \ [list(contender_dict.keys())[i] if i in cores_start else None for i in range(self.scenario.number_cores)] self.tourn_id = uuid.uuid4().hex self.rtac_data.tournID = self.tourn_id log_message = f'Starting tournament {self.tourn_id}' \ + f' (nr. {self.tourn_nr}) on instance {self.instance}' self.logs.general_log(log_message) self.tournamentstats = \ TournamentStats(self.tourn_id, tourn_nr, self.conf_id_list, None, [], [], [], [], {}) self.sync_event = mp.Event() for core in cores_start: self.ta_runner = \ self.ta_runner_class(self.scenario, self.logs, core) contender = self.config_list[core] self.tournamentstats.TARuns[contender.id] = \ TARun(contender.id, contender.conf, 0, 0, TARunStatus.running) translated_config = self.ta_runner.translate_config(contender) self.rtac_data.process[core] = \ mp.Process(target=self.ta_runner.run, args=[self.instance, translated_config, self.rtac_data, self.sync_event]) # Starting processes for core in cores_start: # range(self.scenario.number_cores): self.rtac_data.process[core].start() self.sync_event.set() time.sleep(0.01) for core in cores_start: set_affinity_recursive(self.rtac_data.process[core], core)
[docs] def fill_tournament(self, cores_start: list[int]) -> None: """ Fills up the remaining cores to be used in an early started tournament. Parameters ---------- cores_start : list[int] List of cores which to start the contenders on. Returns ------- None """ for core in cores_start: self.ta_runner = \ self.ta_runner_class(self.scenario, self.logs, core) contender = self.config_list[core] self.tournamentstats.TARuns[contender.id] = \ TARun(contender.id, contender.conf, 0, 0, TARunStatus.running) translated_config = self.ta_runner.translate_config(contender) self.rtac_data.process[core] = \ mp.Process(target=self.ta_runner.run, args=[self.instance, translated_config, self.rtac_data, self.sync_event]) self.rtac_data.start = time.time() # Starting processes for core in cores_start: self.rtac_data.process[core].start() for core in cores_start: set_affinity_recursive(self.rtac_data.process[core], core)
[docs] def watch_tournament(self) -> None: """ Function to observe the tournament and enforce the timelimit scenario.timeout if reached. Returns ------- None """ while any(proc.is_alive() for proc in self.rtac_data.process): time.sleep(1) # Timeout is int, so checking every second is enough currenttime = time.time() - self.rtac_data.start if currenttime >= self.scenario.timeout: self.currenttime = currenttime self.close_tournament()
[docs]class Tournament_GB: """ Class that contains gray-box tournament functions to be inserted into tournament classes if scenario.gray_box is True. """
[docs] def watch_tournament_gray_box(self, early_tournament=False) -> None: """ Function to observe the tournament and enforce the timelimit scenario.timeout if reached. Parameters ---------- early_tournament : bool True if tournament is early starter, False if not. Returns ------- None """ gb_check_time = time.time() self.gb_pw_inst_archive = [] self.pw_cores = [] self.mtp = {} self.s_instances = [] self.term_list = [] while any(isinstance(p, mp.Process) and p.is_alive() for p in self.rtac_data.process): time.sleep(self.scenario.gb_read_time) currenttime = time.time() - self.rtac_data.start if not early_tournament and not self.terminated_configs: X_pw, cores, self.s_instances, self.gb_pw_inst_archive, \ self.mtp, self.pw_cores = \ self.gray_box.prepare_predict_data(self.rtac_data.rec_data, self.s_instances, self.gb_pw_inst_archive, self.mtp, self.pw_cores) if self.gb_model is not None and len(X_pw) > 2 and \ time.time() \ - gb_check_time >= self.scenario.gb_read_time: pred = self.gray_box.classify_configs( X_pw, self.scenario.number_cores, self.gb_model ) if pred is not None: self.term_list = self.tournamentstats.kills = \ self.gray_box.term_list(pred, cores, self.scenario.verbosity) if self.term_list: self.tm.early_start(currenttime) gb_check_time = time.time() if currenttime >= self.scenario.timeout: self.currenttime = currenttime self.close_tournament()
[docs]class Tournamentpp(Tournament): """ Tournament class with functions needed for ReACTR method tournaments. """
[docs]def tournament_factory(scenario: argparse.Namespace, ta_runner: BaseTARunner, rtac_data: RTACData | RTACDatapp, logs: RTACLogs ) -> Tournament | Tournamentpp: """ Class factory to return the initialized TournamentManager class appropriate to the RTAC method `scenario.ac`. Parameters ---------- scenario : argparse.Namespace Namespace containing all settings for the RTAC. ta_runner : BaseTARunner Target algorithm runner object. rtac_data : RTACData | RTACDatapp Object containing data and objects necessary throughout the RTAC modules. logs : RTACLogs Object containing loggers and logging functions. Returns ------- Tournament or Tournamentpp Initialized Tournament object matching the RTAC method of the scenario. """ if scenario.ac in (ACMethod.ReACTR, ACMethod.CPPL): tournament = Tournament elif scenario.ac is ACMethod.ReACTRpp: tournament = Tournamentpp if scenario.gray_box: tournament.watch_tournament = Tournament_GB.watch_tournament_gray_box return tournament(scenario, ta_runner, rtac_data, logs)
if __name__ == '__main__': pass