"""This module contains functions for logging the data between
instances/tournaments as well as stats about toournaments and results."""
from abc import ABC, abstractmethod
from typing import Any, Optional
import argparse
import os
import logging
import joblib
import numpy as np
import copy
from logging.handlers import RotatingFileHandler, BaseRotatingHandler
from rtac.ac_functionalities.rtac_data import (
Configuration,
RTACData,
RTACDatapp,
TournamentStats,
ACMethod,
Generator
)
__all__ = ('Configuration')
[docs]class NewRotatingFileHandler(RotatingFileHandler):
"""Overwriting logging.handlers.RotatingFileHandler in order to log to the
same line in the file."""
def __init__(self, filename, mode='w', maxBytes=0, backupCount=0):
BaseRotatingHandler.__init__(self, filename, mode, encoding=None,
delay=False)
self.maxBytes = maxBytes
self.backupCount = backupCount
[docs]class AbstractLogs(ABC):
"""
Class with all functions and loggers concerning logging and loading
RTAC and tournament data.
Parameters
----------
scenario : argparse.Namespace
Namespace containing all settings for the RTAC.
"""
def __init__(self, scenario: argparse.Namespace):
"""
Initializes logging class, check if log directory exists and create
it if needed.
"""
self.scenario = scenario
self.ranking = scenario.ac
if not os.path.isdir(scenario.log_folder):
os.makedirs(scenario.log_folder)
self.log_path = scenario.log_folder + '/' \
+ scenario.wrapper_name + '_' \
+ str(scenario.ac).split('.')[1]
if scenario.gray_box:
self.log_path += '_gb'
if not os.path.isdir(self.log_path):
os.makedirs(self.log_path)
self.experimental = scenario.experimental
if not scenario.resume:
if not self.experimental:
filelist = \
[f for f in os.listdir(self.log_path)
if f.endswith('.log')]
else:
filelist = \
[f for f in os.listdir(self.log_path)
if f.endswith('.log') and 'tourn_0' not in f]
for f in filelist:
os.remove(os.path.join(self.log_path, f))
self.objective_min = scenario.objective_min
print('\n')
print(f'Logging to {self.log_path}')
[docs] def init_rtac_logs(self) -> None:
"""
Initializes loggers for realtime algorithm configuration data
that are shared by all methods.
Returns
-------
None
"""
if not self.objective_min:
self.times = {}
else:
self.results = {}
# Set up general logging
self.main_log = logging.getLogger('main_log')
self.main_log.setLevel(logging.INFO)
g_fh = logging.FileHandler(f'{self.log_path}/general.log')
g_fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(message)s',
'%d/%m/%Y %H:%M:%S:')
g_fh.setFormatter(formatter)
self.main_log.addHandler(g_fh)
# Set up winner trajectory logging
self.winner_trajectory = logging.getLogger('winner_trajectory_log')
self.winner_trajectory.setLevel(logging.INFO)
wt_fh = logging.FileHandler(f'{self.log_path}/winner.log')
wt_fh.setLevel(logging.INFO)
wt_fh.setFormatter(formatter)
self.winner_trajectory.addHandler(wt_fh)
# Set up tournament stats logging
self.tourn_stats_log = logging.getLogger('tourn_stats_log')
self.tourn_stats_log.setLevel(logging.INFO)
ts_fh = logging.FileHandler(f'{self.log_path}/tourn_stats.log')
ts_fh.setLevel(logging.INFO)
ts_fh.setFormatter(formatter)
self.tourn_stats_log.addHandler(ts_fh)
# Set up logging of the last tournament number
self.tourn_nr_log = logging.getLogger('tourn_nr_log')
self.tourn_nr_log.setLevel(logging.INFO)
tn_fh = NewRotatingFileHandler(f'{self.log_path}/tourn_nr.log',
mode='w', maxBytes=1, backupCount=0)
tn_fh.setLevel(logging.INFO)
tn_fh.suffix = ""
streamformatter = logging.Formatter(fmt='%(message)s')
tn_fh.setFormatter(streamformatter)
self.tourn_nr_log.addHandler(tn_fh)
if not self.objective_min:
# Set up ta winning runtime logging
self.times_log = logging.getLogger('times_log')
self.times_log.setLevel(logging.INFO)
t_fh = NewRotatingFileHandler(f'{self.log_path}/times.log',
mode='w', maxBytes=1, backupCount=0)
t_fh.setLevel(logging.INFO)
t_fh.suffix = ""
streamformatter = logging.Formatter(fmt='%(message)s')
t_fh.setFormatter(streamformatter)
self.times_log.addHandler(t_fh)
else:
# Set up ta results logging
self.results_log = logging.getLogger('results_log')
self.results_log.setLevel(logging.INFO)
r_fh = NewRotatingFileHandler(f'{self.log_path}/results.log',
mode='w', maxBytes=1, backupCount=0)
r_fh.setLevel(logging.INFO)
r_fh.suffix = ""
r_fh.setFormatter(streamformatter)
self.results_log.addHandler(r_fh)
# Set up tournament contender list logging
self.contender_dict_log = logging.getLogger('contender_dict_log')
self.contender_dict_log.setLevel(logging.INFO)
cl_fh = \
logging.FileHandler(f'{self.log_path}/contender_dict_tourn_0.log')
cl_fh.setLevel(logging.INFO)
self.contender_dict_log.addHandler(cl_fh)
[docs] def general_log(self, message: str) -> None:
"""Log message.
Parameters
----------
message : str
Any message provided as a string.
Returns
-------
None
"""
self.main_log.info(f'{message}')
[docs] def scenario_log(self, scenario: argparse.Namespace) -> None:
"""Save RTAC scenario.
Parameters
----------
scenario : argparse.Namespace
Namespace containing all settings for the RTAC.
Returns
-------
None
"""
with open(f'{self.log_path}/scenario.log', 'w') as sf:
sf.write(str(scenario))
[docs] def rtac_log(self, rtac_data: RTACData | RTACDatapp,
tourn_stats: TournamentStats) -> None:
"""
Logs for realtime algorithm configuration data
concerning all methods.
Parameters
----------
rtac_data : RTACData | RTACDatapp
Object containing data and objects necessary throughout the rtac
modules.
tourn_stats : TournamentStats
Object containing statistics about the previous tournament.
Returns
-------
None
"""
self.winner_trajectory.info(f'{rtac_data.winner.value}' + '\n')
self.tourn_stats_log.info(str(tourn_stats) + '\n')
self.tourn_nr_log.info(str(tourn_stats.tourn_nr) + '\n')
if not self.objective_min:
self.times[tourn_stats.id] = min(rtac_data.ta_res_time[:])
self.times_log.info(str(self.times) + '\n')
else:
self.results[tourn_stats.id] = min(rtac_data.ta_res[:])
self.results_log.info(str(self.results) + '\n')
[docs] @abstractmethod
def load_data(self) -> Any:
"""Loads the data of either last logged, or first tournament."""
[docs]class RTACLogs(AbstractLogs):
"""
Class with all functions and loggers concerning logging and loading
RTAC and tournament data for ReACTR implementation.
"""
[docs] def init_ranking_logs(self) -> None:
"""
Initializes loggers for data concerning ReACTR.
Returns
-------
None
"""
# Set up pool logging
self.pool_log = logging.getLogger('pool_log')
self.pool_log.setLevel(logging.INFO)
p_fh = logging.FileHandler(f'{self.log_path}/pool_tourn_0.log')
p_fh.setLevel(logging.INFO)
self.pool_log.addHandler(p_fh)
if self.ranking in (ACMethod.ReACTR, ACMethod.ReACTRpp):
# Set up trueskill scores logging
self.scores_log = logging.getLogger('scores_log')
self.scores_log.setLevel(logging.INFO)
s_fh = logging.FileHandler(f'{self.log_path}/scores_tourn_0.log')
s_fh.setLevel(logging.INFO)
self.scores_log.addHandler(s_fh)
elif self.ranking is ACMethod.CPPL:
# Set up bandit logging
self.bandit_log = logging.getLogger('bandit_log')
self.bandit_log.setLevel(logging.INFO)
b_fh = logging.FileHandler(f'{self.log_path}/bandit_tourn_0.log')
b_fh.setLevel(logging.INFO)
self.bandit_log.addHandler(b_fh)
[docs] def ranking_log(self, pool: dict[str, Configuration],
assessment: dict[str, Any], tourn_nr: int,
contender_dict: dict[str, Configuration],
**kwargs) -> None:
"""
Logs data concerning RAC method.
Parameters
----------
pool : dict[str, Configuration]
Dictionary with configuration id as key and configuration
as value with scenario.contenders == #items.
assessment : dict[str, Any]
Dictionary with configuration id as key and
assessment depending on the AC method used, e.g., trueskill scores,
or bandit model.
tourn:nr : int
Number of tournament after which logs are done.
contender_dict : dict[str, Configuration]
Dictionary with configuration id as key and
configuration as value: contenders of the previous tournament.
**kwargs
Additional keyword arguments. Possible keys include:
- `standard_scaler` sklearn.preprocessing.StandardScaler
- `min_max_scaler` sklearn.preprocessing.MinMaxScaler
- `pca_obj_params` sklearn.decomposition.PCA
Returns
-------
None
"""
self.contender_dict_log.handlers.clear()
cl_fh = logging.FileHandler(
f'{self.log_path}/contender_dict_tourn_{tourn_nr}.log')
cl_fh.setLevel(logging.INFO)
self.contender_dict_log.addHandler(cl_fh)
self.contender_dict_log.info(str(list(contender_dict.keys())))
self.pool_log.handlers.clear()
p_fh = logging.FileHandler(
f'{self.log_path}/pool_tourn_{tourn_nr}.log')
p_fh.setLevel(logging.INFO)
self.pool_log.addHandler(p_fh)
serializable_pool = copy.deepcopy(pool)
for conf in serializable_pool.values():
conf.gen = conf.gen.name
self.pool_log.info(str(serializable_pool))
if self.ranking in (ACMethod.ReACTR, ACMethod.ReACTRpp):
self.scores_log.handlers.clear()
s_fh = logging.FileHandler(
f'{self.log_path}/scores_tourn_{tourn_nr}.log')
s_fh.setLevel(logging.INFO)
self.scores_log.addHandler(s_fh)
self.scores_log.info(str(assessment))
elif self.ranking is ACMethod.CPPL:
if not os.path.isdir(f'{self.log_path}/bandit_models'):
os.mkdir(f'{self.log_path}/bandit_models')
self.bm_path = f'{self.log_path}/bandit_models'
bandit_models = kwargs['bandit_models']
if tourn_nr == 0:
joblib.dump(
bandit_models['standard_scaler'],
f'{self.bm_path}/standard_scaler_{tourn_nr}.pkl')
joblib.dump(
bandit_models['min_max_scaler'],
f'{self.bm_path}/min_max_scaler_{tourn_nr}.pkl')
joblib.dump(
bandit_models['one_hot_encoder'],
f'{self.bm_path}/one_hot_encoder_{tourn_nr}.pkl')
joblib.dump(
bandit_models['pca_obj_params'],
f'{self.bm_path}/pca_obj_params_{tourn_nr}.pkl')
joblib.dump(
bandit_models['pca_obj_inst'],
f'{self.bm_path}/pca_obj_inst_{tourn_nr}.pkl')
elif self.scenario.online_instance_train and tourn_nr > 0:
joblib.dump(
bandit_models['standard_scaler'],
f'{self.bm_path}/standard_scaler_{tourn_nr}.pkl')
joblib.dump(
bandit_models['pca_obj_inst'],
f'{self.bm_path}/pca_obj_inst_{tourn_nr}.pkl')
elif len(bandit_models) == 0:
pass
self.bandit_log.handlers.clear()
b_fh = logging.FileHandler(
f'{self.log_path}/bandit_tourn_{tourn_nr}.log')
b_fh.setLevel(logging.INFO)
self.bandit_log.addHandler(b_fh)
self.bandit_log.info(str(assessment))
[docs] def parse_array(self, val: str) -> int | float:
"""
Helper function for loading logs of nd.arrays.
Parameters
----------
val : str
Loaded string to decode and transform.
Returns
-------
int or float
Decoded and transformed form of val.
Raises
------
ValueError
If val could not be parsed.
"""
if not isinstance(val, str):
return val
val = val.strip()
# Array-like: [1. 2. 3.]
if val.startswith('[') and val.endswith(']'):
val = val.strip("[]\n")
return np.fromstring(val, sep=' ')
# Scalar string: try to convert to int or float
try:
return int(val) if '.' not in val else float(val)
except ValueError:
raise ValueError(f"Cannot parse value: {val}")
[docs] def load_data(self, tourn_nr: int | None = None) \
-> tuple[dict[str, Configuration], dict[str, Any],
dict[str, Configuration], int,
Optional[Any]]:
"""
Loads data necessary for resuming the algorithm configuration from
last logged state of ReACTR.
Parameters
----------
tourn_nr : int | None
Either int to load logs of tournament nr. tourn_nr or None if
loading tournament nr. 0 for experimental mode.
Returns
-------
tuple
- **pool** : dict[str, Configuration],
Configuration pool.
- **assessment** : dict[str, Any],
Scores/ Skills, confidences of logged tournament.
- **contender_dict** : dict[str, Configuration],
List of contending Configurations from logged tournament.
- **tourn_nr** : int,
Number of logged tournament.
- **bandit_models** : dict[str, Any],
All objects needed for CPPL model employment.
"""
if tourn_nr is None:
with open(f'{self.log_path}/tourn_nr.log') as f:
tourn_nr = int(f.readline().strip())
with open(f'{self.log_path}/pool_tourn_{tourn_nr}.log', 'r') as f:
line = f.readline()
pool = eval(line)
for conf in pool.values():
conf.gen = Generator[conf.gen]
if self.ranking in (ACMethod.ReACTR, ACMethod.ReACTRpp):
with open(
f'{self.log_path}/scores_tourn_{tourn_nr}.log', 'r') as f:
assessment = eval(f.readline())
if self.scenario.experimental:
assessment = dict(zip(list(pool.keys()), assessment.values()))
elif self.ranking is ACMethod.CPPL:
self.bm_path = f'{self.log_path}/bandit_models'
if not self.scenario.online_instance_train:
tourn_nr = 0
with open(
f'{self.log_path}/bandit_tourn_{tourn_nr}.log', 'r') as f:
assessment = f.read()
assessment = eval(assessment, {"array": np.array})
assessment = \
{k: self.parse_array(v) for k, v in assessment.items()}
standard_scaler = \
joblib.load(f'{self.bm_path}/standard_scaler_{tourn_nr}.pkl')
min_max_scaler = \
joblib.load(f'{self.bm_path}/min_max_scaler_0.pkl')
one_hot_encoder = \
joblib.load(f'{self.bm_path}/one_hot_encoder_0.pkl')
pca_obj_params = \
joblib.load(f'{self.bm_path}/pca_obj_params_0.pkl')
pca_obj_inst = \
joblib.load(f'{self.bm_path}/pca_obj_inst_{tourn_nr}.pkl')
bandit_models = {'standard_scaler': standard_scaler,
'min_max_scaler': min_max_scaler,
'one_hot_encoder': one_hot_encoder,
'pca_obj_params': pca_obj_params,
'pca_obj_inst': pca_obj_inst}
with open(f'{self.log_path}/contender_dict_tourn_{tourn_nr}.log',
'r') as f:
contender_ids = eval(f.readline())
if self.experimental:
if self.ranking in (ACMethod.ReACTR, ACMethod.ReACTRpp):
os.remove(f'{self.log_path}/scores_tourn_{tourn_nr}.log')
elif self.ranking is ACMethod.CPPL:
os.remove(f'{self.log_path}/bandit_tourn_{tourn_nr}.log')
filelist = [f for f in os.listdir(self.bm_path)]
for f in filelist:
os.remove(os.path.join(self.bm_path, f))
os.remove(f'{self.log_path}/pool_tourn_{tourn_nr}.log')
os.remove(f'{self.log_path}/contender_dict_tourn_{tourn_nr}.log')
contender_dict = {}
for ci in contender_ids:
contender_dict[ci] = pool[ci]
if self.ranking in (ACMethod.ReACTR, ACMethod.ReACTRpp):
return pool, assessment, contender_dict, tourn_nr
elif self.ranking is ACMethod.CPPL:
return pool, assessment, contender_dict, tourn_nr, bandit_models
if __name__ == '__main__':
pass