"""This module contans classes for touenament management according to the RTAC
method used."""
from abc import ABC, abstractmethod
import argparse
import threading
import time
import copy
import gc
import sys
from rtac.utils.clean_logs import remove_fuse_hidden_files as rfhf
from collections import OrderedDict
from multiprocessing.sharedctypes import Synchronized
from rtac.ac_functionalities.tournament import tournament_factory
from rtac.ac_functionalities.result_processing import processing_factory
from rtac.ac_functionalities.rtac_data import (
TARunStatus,
RTACData,
RTACDatapp,
ACMethod,
TournamentStats
)
from rtac.ac_functionalities.ta_runner import BaseTARunner
from rtac.ac_functionalities.tournament import Tournament, Tournamentpp
from rtac.ac_functionalities.logs import RTACLogs
from rtac.ac_functionalities.ranking.gray_box import Gray_Box
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from rtac.rtac import RTAC
[docs]class AbstractTournamentManager(ABC):
"""
Abstract tournament manager class.
Parameters
----------
scenario : argparse.Namespace
Namespace containing all settings for the RTAC.
ta_runner : BaseTARunner
Target algorithm runner object.
logs : RTACLogs
Object containing loggers and logging functions.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
"""
def __init__(self, scenario: argparse.Namespace, ta_runner: BaseTARunner,
logs: RTACLogs, rtac_data: RTACData | RTACDatapp):
"""
Initialize tournament management class with data and objects
necessary for the RTAC method used. If `self.scenario.resume` is set,
data from the last logged tournament is loaded and algorithm
configuration resumes from that state.
"""
self.huge_float = sys.float_info.max * 1e-100
self.scenario = scenario
self.ta_runner = ta_runner
self.rtac_data = rtac_data
self.logs = logs
self.instance_history = []
self.tournament = tournament_factory(self.scenario, self.ta_runner,
self.rtac_data, self.logs)
self.res_process = processing_factory(self.scenario, self.logs)
if self.scenario.resume:
print('\n')
print('Resuming from previous run.')
if self.scenario.ac in (ACMethod.ReACTR, ACMethod.ReACTRpp):
self.res_process.pool, self.res_process.scores, \
self.contender_dict, self.tourn_nr = self.logs.load_data()
elif self.scenario.ac is ACMethod.CPPL:
self.res_process.pool, self.res_process.scores, \
self.contender_dict, self.tourn_nr, \
self.bandit_models = self.logs.load_data()
self.res_process.contender_dict = self.contender_dict
self.scenario.resume = False
elif self.scenario.experimental:
print('\n')
print('Running in experimental mode.')
self.tourn_nr = 0
if self.scenario.ac in (ACMethod.ReACTR, ACMethod.ReACTRpp):
self.res_process.pool, self.res_process.scores, \
self.contender_dict, self.tourn_nr \
= self.logs.load_data(tourn_nr=0)
self.res_process.contender_dict = self.contender_dict
elif self.scenario.ac is ACMethod.CPPL:
self.res_process.pool, self.res_process.bandit, \
self.contender_dict, self.tourn_nr, \
self.bandit_models = self.logs.load_data(tourn_nr=0)
self.res_process.contender_dict = self.contender_dict
else:
self.tourn_nr = 0
self.contender_dict = self.res_process.get_contender_dict()
if self.scenario.gray_box:
self.es_tourn_nr = 0
#self.tourn_nr_list = [self.tourn_nr]
self.tourn_nr_list = [self.tourn_nr]
self.logs.init_rtac_logs()
self.logs.init_ranking_logs()
[docs] def set_tourn_status(
self, tournamentstats: TournamentStats,
rtac_data: RTACData | RTACDatapp,
tournament: Tournament | Tournamentpp
) -> TournamentStats:
"""
Setting the results of the tournament and status of the target
algorithm runs to rtac_data.TournamentStats.
Parameters
----------
tournamentstats : TournamentStats
Stats summarizing the tournament results.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
tournament : Tournament | Tournamentpp
Tournament object.
Returns
-------
TournamentStats
Stats summarizing the tournament results.
"""
tournamentstats.results = rtac_data.ta_res[:]
tournamentstats.times = rtac_data.ta_res_time[:]
tournamentstats.rtac_times = rtac_data.ta_rtac_time[:]
if self.scenario.verbosity == 2:
print('* Tournament:', tournament.tourn_id, 'consisted of',
len(tournamentstats.TARuns), 'contenders.')
for tr, tarun in enumerate(tournamentstats.TARuns):
tournamentstats.TARuns[tarun].res = rtac_data.ta_res[tr]
tournamentstats.TARuns[tarun].time = \
rtac_data.ta_res_time[tr]
tournamentstats.TARuns[tarun].status = \
TARunStatus(rtac_data.status[tr])
return tournamentstats
[docs] @abstractmethod
def solve_instance(self, instance: str, rtac_data: RTACData | RTACDatapp,
next_instance: str = None,
rtac: 'RTAC' = None) -> RTACData:
"""
Solve the problem instance according to the RTAC method used.
Parameters
----------
instance : str
Path to the problem instance to solve.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
next_instance : str
Problem instance ID. Degaults to None.
rtac : RTAC
The self reference of rtac.rtac.RTAC to enable the tournament
manager to start a tournament run ftom RTAC.
Returns
-------
RTACData
Updated object containing data and objects necessary throughout the
RTAC modules.
"""
[docs] def manage_tournament(self, instance: str,
rtac_data: RTACData | RTACDatapp) -> None:
"""
Manages the procedure of the tournament.
Parameters
----------
instance : str
Path to the problem instance to solve.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
Returns
-------
None
"""
msg = f'# Start solving instance {instance}... #'
len_msg = len(msg)
print('#' * len_msg)
print('#' + ' ' * (len_msg - 2) + '#')
print(msg)
print('#' + ' ' * (len_msg - 2) + '#')
print('#' * len_msg)
self.instance = instance
cores_start = [i for i in range(self.scenario.number_cores)]
self.tournament.start_tournament(self.instance, self.contender_dict,
self.tourn_nr, cores_start)
# self.rtac_data = self.tournament.rtac_data = rtac_data
self.tournament.watch_tournament()
# Update tournament status
self.tournamentstats = self.tournament.tournamentstats
self.tournamentstats.winner = self.rtac_data.winner.value
self.res_process.process_tourn(self.rtac_data, self.instance)
self.general_logging(scenario=self.scenario,
rtac_data=rtac_data,
tournamentstats=self.tournamentstats,
tourn_nr=self.tourn_nr,
tournament=self.tournament,
instance=instance)
self.contender_dict = self.res_process.get_contender_dict()
self.instance_history.append(instance)
# self.tourn_nr += 1
self.tourn_nr = self.tourn_nr_list[-1] + 1
self.tourn_nr_list.append(self.tourn_nr)
self.res_process.tourn_nr = self.tourn_nr
self.tournament.tourn_nr = self.tourn_nr
[docs] def general_logging(self, scenario: argparse.Namespace = None,
rtac_data: RTACData | RTACDatapp = None,
tournamentstats: TournamentStats = None,
tourn_nr: int = None,
tournament: Tournament | Tournamentpp = None,
early_tourn: bool = False,
instance: str = None) -> None:
"""
Carries out logging of general information about the tournament.
Parameters
----------
scenario : argparse.Namespace
Namespace containing all settings for the RTAC.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
tournamentstats : TournamentStats
Stats summarizing the tournament results.
tourn_nr : int
Number of the tournament.
tournament : Tournament | Tournamentpp
Tournament object.
early_tourn : bool
True if tournament starts early, Flase if not.
instance: str
ID of the problem instance to be solved.
Returns
-------
None
"""
if scenario is None:
scenario = self.scenario
rtac_data = self.rtac_data
tournamentstats = self.tournamentstats
tourn_nr = self.tourn_nr
tournament = self.tournament
print('\n')
if self.scenario.verbosity == 2:
if early_tourn:
message = '-' * 157 + '\n' \
+ 'Instance ' + str(instance) \
+ ' solved in early starting tournament ' \
+ str(tournament.tourn_id) + '.\n\n' \
+ 'Unadjusted time results including time' \
+ ' advantage are:' + '\n\n' \
+ 'Runtimes stated by TA: ' \
+ str(rtac_data.ta_res_time[:]) + '\n' \
+ 'Runtimes with TA runner: ' \
+ str(rtac_data.ta_rtac_time[:]) + '\n\n' \
+ '* Cancelled configurations are set to timeout.' \
+ '\n\n'
message += '* The following time results are adjusted to not' \
+ ' include the extra runtime.\n\n\n'
message += '- ' * 78 + '\n'
rtac_data = self.adjust_time_results(rtac_data)
else:
message = 'Instance ' + str(instance) \
+ ' solved in tournament ' \
+ str(tournament.tourn_id) + '.\n\n'
message += 'Objective values: ' + str(rtac_data.ta_res[:]) \
+ '\n\n' \
+ '* Cancelled configurations are set to Big M.' + '\n\n\n' \
+ 'Runtimes stated by TA: ' \
+ str(rtac_data.ta_res_time[:]) + '\n' \
+ 'Runtimes with TA runner: ' \
+ str(rtac_data.ta_rtac_time[:]) + '\n\n' \
+ '* Cancelled configurations are set to timeout.\n'
if early_tourn:
message += '-' * 157
message += '\n'
print(message)
self.logs.general_log(message)
tournamentstats = self.set_tourn_status(tournamentstats,
rtac_data,
tournament)
if self.scenario.verbosity != 2 and early_tourn:
rtac_data = self.adjust_time_results(rtac_data)
self.logs.rtac_log(rtac_data, tournamentstats)
log_message = \
f'Winner of tournament {tournament.tourn_id}' \
+ f' (nr. {tourn_nr}) is {tournamentstats.winner}'
self.logs.general_log(log_message)
[docs] def adjust_time_results(self, rtac_data: RTACData | RTACDatapp) -> None:
"""
Adjusts the time results of the early tournament by the time that was
saved from the early starting.
Parameters
----------
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
Returns
-------
None
"""
if all(res == self.scenario.timeout
for res in rtac_data.ta_res_time[:]):
self.finished.wait()
for i, res in enumerate(rtac_data.ta_res_time):
if res != float(self.scenario.timeout) and i in self.term_list:
rtac_data.ta_res_time[i] = \
max(0,
round(res - min(
self.rtac_data.ta_res_time) + self.currenttime, 2
))
for i, res in enumerate(rtac_data.ta_rtac_time):
if res != float(self.scenario.timeout) and i in self.term_list:
rtac_data.ta_rtac_time[i] = \
max(0,
round(res - min(
self.rtac_data.ta_rtac_time) + self.currenttime, 2
))
return rtac_data
[docs] def get_tourn_nr(self, rtac_data: RTACData | RTACDatapp) -> None:
"""
Get the number of the tournament to be logged.
Parameters
----------
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
Returns
-------
None
"""
if not bool(rtac_data.early_start_tournament.value):
tourn_nr = self.tourn_nr
else:
tourn_nr = self.es_tourn_nr
return tourn_nr
[docs]class TournamentManager(AbstractTournamentManager):
"""
Tournament manager class for the ReACTR implementation.
"""
[docs] def solve_instance(self, instance: str, rtac_data: RTACData | RTACDatapp,
**kwargs) -> RTACData:
"""
Solving the problem instance according to the ReACTR implementation.
Parameters
----------
instance : str
Path to the problem instance to solve.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the rtac
modules.
**kwargs
Additional keyword arguments that vary by RTAC method.
Returns
-------
RTACData
Updated object containing data and objects necessary throughout the
rtac modules.
"""
self.rtac_data = self.tournament.rtac_data = rtac_data
tourn_nr = self.get_tourn_nr(rtac_data)
self.kwargs = kwargs
self.logs.ranking_log(self.res_process.pool, self.res_process.scores,
tourn_nr, self.contender_dict)
self.manage_tournament(instance, rtac_data)
return self.rtac_data
[docs]class TournamentManagerCPPL(AbstractTournamentManager):
"""
Tournament manager class for the CPPL implementation.
"""
[docs] def solve_instance(self, instance: str, rtac_data: RTACData | RTACDatapp,
**kwargs) -> RTACData:
"""
Solving the problem instance according to the CPPL implementation.
Parameters
----------
instance : str
Path to the problem instance to solve.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the rtac
modules.
**kwargs
Additional keyword arguments that vary by RTAC method.
Returns
-------
RTACData
Updated object containing data and objects necessary throughout the
rtac modules.
"""
self.rtac_data = self.tournament.rtac_data = rtac_data
tourn_nr = self.get_tourn_nr(rtac_data)
self.kwargs = kwargs
self.logs.ranking_log(self.res_process.pool, self.res_process.bandit,
tourn_nr, self.contender_dict,
bandit_models=self.res_process.bandit_models)
self.manage_tournament(instance, rtac_data)
return self.rtac_data
[docs]class GrayBox:
"""
This class contains functions needed for the gray-box
functionality of the tournament manager classes.
"""
[docs] def train_gray_box_model(self) -> None:
"""
Train gray-box modelö with the total data gathered so far.
Returns
-------
None
"""
self.gb_pw_inst_archive = self.tournament.gb_pw_inst_archive
if self.gb_pw_inst_archive:
cores, s_instances = self.tournament.pw_cores, \
self.tournament.s_instances
if not self.scenario.objective_min:
res = list(self.rtac_data.ta_res_time)
else:
res = list(self.rtac_data.ta_res)
winner = res.index(min(res))
self.X_train, self.y_train, self.cost_mat_train = \
self.gray_box.prepare_train_data(
self.X_train, self.gb_pw_inst_archive, cores, winner, res,
s_instances, self.y_train, self.cost_mat_train
)
self.gb_model = \
self.gray_box.train_gb(self.X_train, self.y_train,
self.cost_mat_train,
self.scenario.number_cores)
# Pass TournamentManager object reference to Tournament object
# so it can call early_start, etc.
self.tournament.tm = self
self.tournament.gray_box = self.gray_box
self.tournament.gb_model = self.gb_model
[docs] def manage_tournament(self, instance: str, rtac_data: RTACData | RTACDatapp,
**kwargs) -> None:
"""
Manages the procedure of the tournament. Runs early tournament if
CPU cores were freed by the gray-box model.
Parameters
----------
instance : str
Path to the problem instance to solve.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
**kwargs
Additional keyword arguments that vary by RTAC method.
Returns
-------
None
"""
if instance not in self.instance_history:
self.instance_history.append(instance)
if self.kwargs['es_rtac_data'] is None:
msg = f'# Start solving instance {instance}... #'
len_msg = len(msg)
print('#' * len_msg)
print('#' + ' ' * (len_msg - 2) + '#')
print(msg)
print('#' + ' ' * (len_msg - 2) + '#')
print('#' * len_msg)
self.rtac_data = rtac_data
cores_start = [i for i in range(self.scenario.number_cores)]
self.finished = threading.Event()
self.rtac = self.kwargs['rtac']
self.next_instance = \
self.kwargs['next_instance'][0] \
if self.kwargs['next_instance'] \
else None
self.gray_box = Gray_Box()
self.tournament.gray_box = self.gray_box
self.tournament.tm = self
self.X_train = []
self.y_train, self.cost_mat_train = [], []
self.tournament.terminated_configs = []
self.instance = instance
self.rtac_data = self.tournament.rtac_data = rtac_data
self.tournament.start_tournament(self.instance,
self.contender_dict,
self.tourn_nr, cores_start)
self.tournament.watch_tournament()
self.tournamentstats = self.tournament.tournamentstats
self.tournamentstats.winner = self.rtac_data.winner.value
# Only process tournament if it was a regular one
self.res_process.process_tourn(self.rtac_data, self.instance,
self.tourn_nr)
self.general_logging(instance=self.instance)
self.contender_dict = self.res_process.get_contender_dict()
self.train_gray_box_model()
self.tourn_nr = self.tourn_nr_list[-1] + 1
self.tourn_nr_list.append(self.tourn_nr)
self.res_process.tourn_nr = self.tourn_nr
self.tournament.tourn_nr = self.tourn_nr
self.finished.set()
gc.collect
else:
msg = f'# Start solving instance {instance} ' + \
'with time advantage... #'
len_msg = len(msg)
print('#' * len_msg)
print('#' + ' ' * (len_msg - 2) + '#')
print(msg)
print('#' + ' ' * (len_msg - 2) + '#')
print('#' * len_msg)
self.es_rtac_data = self.kwargs['es_rtac_data']
cores_start = \
[core for core in range(self.scenario.number_cores)
if core in self.term_list]
self.next_instance = instance
self.es_scenario = copy.deepcopy(self.scenario)
self.es_scenario.timeout = \
self.scenario.timeout + self.time_advantage
self.es_tournament = tournament_factory(self.es_scenario,
self.ta_runner,
rtac_data,
self.logs)
self.es_tournament.rtac_data = self.es_rtac_data
contender_dict = self.res_process.get_contender_dict()
# Put best contender at indices of term_list
items = list(contender_dict.items())
num_to_move = len(self.term_list)
moved_items = items[:num_to_move]
remaining_items = items[num_to_move:]
for i, idx in enumerate(sorted(self.term_list)):
remaining_items.insert(idx, moved_items[i])
contender_dict = OrderedDict(remaining_items)
self.es_tournament.start_tournament(self.next_instance,
contender_dict,
self.es_tourn_nr,
cores_start)
self.es_tournament.tourn_nr = self.es_tourn_nr
self.es_tournament.watch_tournament(early_tournament=True)
self.es_tournamentstats = self.es_tournament.tournamentstats
self.es_tournamentstats.winner = self.es_rtac_data.winner.value
self.general_logging(scenario=self.es_scenario,
rtac_data=self.es_rtac_data,
tournamentstats=self.es_tournamentstats,
tourn_nr=self.es_tourn_nr,
tournament=self.es_tournament,
early_tourn=True,
instance=self.next_instance)
self.early_rtac_data.newtime = \
min(self.es_rtac_data.ta_res_time[:])
results = []
if self.scenario.objective_min:
for core in range(self.scenario.number_cores):
results.append(self.es_rtac_data.ta_res[core])
else:
for core in range(self.scenario.number_cores):
results.append(self.es_rtac_data.ta_res_time[core])
self.res_process.\
result_summary_terminal(results, self.es_tourn_nr)
self.es_output(self.next_instance)
self.es_rtac_data.early_start_tournament.value = False
self.rtac_data.early_start_tournament.value = False
self.early_finished.set()
gc.collect
self.rtac_data.skip = False
rfhf(self.scenario.log_folder)
else:
# Skipping problem instance if it was already solved in an early
# starting tournment
self.rtac_data.skip = True
[docs] def early_start(self, currenttime: float) -> None:
"""
Starts early tournament with only as many contenders as were terminated
by the gray-box model.
Parameters
----------
currenttime : float
Time in the tournament at which contenders were terminated.
Returns
-------
None
"""
self.term_list = copy.deepcopy(self.tournament.term_list)
if self.next_instance:
print('\n')
print('Next problem instance was provided!\n')
self.currenttime = currenttime
self.early_finished = threading.Event()
for c in self.term_list:
self.rtac_data.status[c] = 4 # TARunStatus.terminated
self.tournament.terminate_run(c, self.rtac_data.process[c])
self.time_advantage = \
int(self.scenario.timeout - int(time.time() - (
max(self.tournament.rtac_data.substart_wall)
)))
self.tournament.terminated_configs = copy.deepcopy(self.term_list)
self.early_rtac_data = self.rtac_data.early_rtac_copy()
for c in range(self.scenario.number_cores):
if c not in self.term_list:
# TARunStatus.awaiting_start -> 6
self.early_rtac_data.status[c] = 6
self.early_rtac_data.early_start_tournament.value = True
self.rtac_data.early_start_tournament.value = True
self.early_rtac_data.cores_start = copy.deepcopy(self.term_list)
self.es_tourn_nr = self.tourn_nr_list[-1] + 1
self.tourn_nr_list.append(self.es_tourn_nr)
print('\n')
print(f'Letting contenders start early at {currenttime}s',
'of runtime of previous tournament on freed cores.')
self.rtac.solve_instance(self.next_instance,
None, self.early_rtac_data)
threading.Thread(target=self.wait_for_event, daemon=True).start()
[docs] def wait_for_event(self) -> None:
"""
Waits for the tournament to finish and triggers the start of the
remaining cores in the early tournament.
Returns
-------
None
"""
self.finished.wait()
self.fill_early_tournament()
[docs] def fill_early_tournament(self) -> None:
"""
Manages and starts the remaining contenders in the early tournament.
Returns
-------
None
"""
if not self.early_finished.is_set():
self.es_tournament.contender_dict = \
self.res_process.get_contender_dict()
self.early_rtac_data.cores_start = \
[core for core in range(self.scenario.number_cores)
if core not in self.term_list]
self.es_tournament.config_list = \
[list(self.es_tournament.contender_dict.values())[i]
if i in self.early_rtac_data.cores_start
else self.es_tournament.config_list[i]
for i in range(self.scenario.number_cores)]
self.es_tournament.conf_id_list = \
[list(self.es_tournament.contender_dict.keys())[i]
if i in self.early_rtac_data.cores_start
else self.es_tournament.config_list[i].id
for i in range(self.scenario.number_cores)]
self.es_tournament.scenario.timeout = self.scenario.timeout
self.es_tournament.fill_tournament(
self.early_rtac_data.cores_start
)
[docs] def es_output(self, instance: str) -> None:
"""
Prints info from early tournament to terminal.
Parameters
----------
instance : int
ID of the instance to be solved in the early starting tournament.
Returns
-------
None
"""
print('\n')
if not self.es_scenario.objective_min:
if isinstance(self.early_rtac_data.newtime, Synchronized):
newtime = self.early_rtac_data.newtime.value
else:
newtime = self.early_rtac_data.newtime
if newtime >= self.es_scenario.timeout:
print(f'Instance {instance} could not be solved within',
f'{self.es_scenario.timeout}s.')
else:
print(f'Solved instance {instance} in',
f'{self.early_rtac_data.newtime}s.')
else:
if self.early_rtac_data.best_res == self.huge_float:
print(f'Instance {instance} could not be solved within',
f'{self.es_scenario.timeout}s.')
else:
print(f'Solved instance {instance} with objective value',
f'{self.early_rtac_data.best_res}.')
print('.\n' * 3)
[docs]def tourn_manager_factory(scenario: argparse.Namespace,
ta_runner: BaseTARunner, logs: RTACLogs,
rtac_data: RTACData | RTACDatapp
) -> TournamentManager | TournamentManagerCPPL:
"""
Class factory to return the initialized TournamentManager class
appropriate to the RTAC method scenario.ac.
Parameters
----------
scenario : argparse.Namespace
Namespace containing all settings for the RTAC.
ta_runner : BaseTARunner
Target algorithm runner object.
logs : RTACLogs
Object containing loggers and logging functions.
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the RTAC
modules.
Returns
-------
TournamentManager or TournamentManagerCPPL
Initialized TournamentManager object matching the RTAC method of the
scenario.
"""
if scenario.ac in (ACMethod.ReACTR, ACMethod.ReACTRpp):
tourn_manager = TournamentManager
elif scenario.ac is ACMethod.CPPL:
tourn_manager = TournamentManagerCPPL
if scenario.gray_box:
for name, func in GrayBox.__dict__.items():
if callable(func) and not name.startswith("__"):
setattr(tourn_manager, name, func)
return tourn_manager(scenario, ta_runner, logs, rtac_data)
if __name__ == '__main__':
pass