Source code for rtac.ac_functionalities.ranking.gray_box

"""
Implements gray-box model based on cost-sensitive random forest from CostCla.
"""

import six
import sys
import joblib
import numpy as np
np.float = float
from sklearn import __version__ as sklearn_version
from packaging import version
from collections import Counter
import sklearn.ensemble._base as base
# Replacing/adding modules used in costcla based on deprecated sklearn modules
sys.modules['sklearn.ensemble.base'] = base
sys.modules['sklearn.externals.joblib'] = joblib
sys.modules['sklearn.externals.six'] = six
sys.modules['sklearn.externals.six.moves'] = six.moves

import multiprocessing
import costcla.models.bagging as bagging_mod


# Patching np.int to int
def _patched_partition_estimators(n_estimators, n_jobs):
    """Patched version of _partition_estimators with np.int replaced by int."""
    if n_jobs == -1:
        n_jobs = min(multiprocessing.cpu_count(), n_estimators)
    else:
        n_jobs = min(n_jobs, n_estimators)

    n_estimators_per_job = \
        (n_estimators // n_jobs) * np.ones(n_jobs, dtype=int)
    n_estimators_per_job[:n_estimators % n_jobs] += 1
    starts = np.cumsum(n_estimators_per_job)

    return n_jobs, n_estimators_per_job.tolist(), [0] + starts.tolist()


bagging_mod._partition_estimators = _patched_partition_estimators

from sklearn.utils import column_or_1d
import costcla.metrics.costs as costs_mod


# Patching np.float to float
def _patched_cost_loss(y_true, y_pred, cost_mat):
    """Patched version of cost_loss with np.float replaced by float."""
    y_true = column_or_1d(y_true)
    y_true = (y_true == 1).astype(float)
    y_pred = column_or_1d(y_pred)
    y_pred = (y_pred == 1).astype(float)
    cost = y_true * ((1 - y_pred) * cost_mat[:, 1] + y_pred * cost_mat[:, 2])
    cost += (
        (1 - y_true) * (
            y_pred * cost_mat[:, 0] + (1 - y_pred) * cost_mat[:, 3]))
    return np.sum(cost)


costs_mod.cost_loss = _patched_cost_loss

from costcla.models.bagging import BaseBagging


# Patching CostCla to work with sklearn >= 1.2
_original_init = BaseBagging.__init__


def _patched_init(self,
                  base_estimator=None,
                  n_estimators=10,
                  max_samples=1.0,
                  max_features=1.0,
                  bootstrap=True,
                  bootstrap_features=False,
                  combination='majority_voting',
                  n_jobs=1,
                  random_state=None,
                  verbose=0):
    """Patched __init__ version for the BaseBagging class."""
    # Handle estimator argument rename in sklearn >= 1.2
    if version.parse(sklearn_version) >= version.parse("1.2"):
        super(BaseBagging, self).__init__(
            estimator=base_estimator,
            n_estimators=n_estimators)
    else:
        super(BaseBagging, self).__init__(
            base_estimator=base_estimator,
            n_estimators=n_estimators)

    self.max_samples = max_samples
    self.max_features = max_features
    self.bootstrap = bootstrap
    self.bootstrap_features = bootstrap_features
    self.combination = combination
    self.n_jobs = n_jobs
    self.random_state = random_state
    self.verbose = verbose


BaseBagging.__init__ = _patched_init


from costcla.models import CostSensitiveRandomPatchesClassifier
from threadpoolctl import threadpool_limits


[docs]class Gray_Box(): """Class containing gray-box model and data processing functions.""" def __init__(self): self.gb_cla = CostSensitiveRandomPatchesClassifier
[docs] def prepare_predict_data( self, rec_data: dict[int, list], instances: list, gb_pw_inst_archive: list, mtp: dict[int, int], pair_cores: list[list[int]] ) -> tuple[list, list, list, list, int, list]: """ Transform single runtime output data instances into pairwise comparison data instances. Note ---- Implementation based on the paper: “Realtime gray-box algorithm configuration using cost-sensitive classification” Parameters ---------- rec_data : dict[int, list] All runtime output data recorded so far in the tournament sorted by core the target algorithm runs on instances : list[list] Unique instances of recorded runtime output. gb_pw_inst_archive : list[list] All unique pairwise comparison instances. mtp : dict[int, int] Key is core, value is the last time point of the data recorded. pair_cores : list[list[int]] Core pairs corresponding to the Configurations compared in gb_pw_inst_archive. Returns ------- tuple - **pw_instances** : list[list], Pairwise comparisons of Configurations based on the most current output. - **cores** : list[list], Core pairs corresponding to the Configurations compared in pw_istances. - **instances** : bool, Unique instances of recorded runtime output. - **gb_pw_inst_archive** : list[list], All unique pairwise comparison instances. - **max_time_points** : dict[int, int], Key is core, value is the last time point of the data recorded. - **pair_cores** : list[list[int]], Core pairs corresponding to the Configurations compared in gb_pw_inst_archive. """ # Get last timepoint of recording for configs, if there were any max_time_points = {} for core, rd in rec_data.items(): rk = rd.keys() if rk: max_time_points[core] = max(rk) # Do not waste time computing with unchanged records if mtp != max_time_points: mtp = max_time_points cores = [] # Compute paiwise comparisons if len(max_time_points) > 1: recorded_cores = max_time_points.keys() pw_instances = [] for core_i in recorded_cores: for core_j in recorded_cores: if core_i != core_j: conf_i = rec_data[core_i][max_time_points[core_i]] conf_j = rec_data[core_j][max_time_points[core_j]] pw_inst = [ conf_i['CPUTimeExpended'], *conf_i['rt_feats'], conf_j['CPUTimeExpended'], *conf_j['rt_feats'], conf_i['CPUTimeExpended'] - conf_j['CPUTimeExpended'], *[a - b for a, b in zip(conf_i['rt_feats'], conf_j['rt_feats'])] ] # if pw_inst: # not in gb_pw_inst_archive: pw_instances.append(pw_inst) gb_pw_inst_archive.append(pw_inst) pair_cores.append( [conf_i['core'], conf_j['core']]) instances.append([conf_i, conf_j]) cores.append([core_i, core_j]) return (pw_instances, cores, instances, gb_pw_inst_archive, max_time_points, pair_cores) else: return ([], [], instances, gb_pw_inst_archive, mtp, pair_cores) else: return [], [], instances, gb_pw_inst_archive, mtp, pair_cores
[docs] def prepare_train_data( self, X_train: list[list], data: list[list], cores: list[list], winner: int, res: dict[int, float], instances: list[list], y: list[int], cost_mat: list[list] ) -> tuple[list[list], list[list], list[list]]: """ Adjust and add pairwise runtime output data gathered in tournament to the total runtime output data gathered so far to be used it for model training. Parameters ---------- X_train : list[list] Total pairwise runtime output data gathered so far. data : list[list] Pairwise runtime output data gathered during last tournament. cores : list[list], Core pairs corresponding to the Configurations compared in data. winner : int Index of the winning Configuration of the last tournament. res : dict[int, float] Results of the last tournament. instances : list[list] Unique instances of recorded runtime output. y : list[int] 0 signififying Configuration_0 is better, 1 signifying Configuration_1 is better within the pairwise comparison corresponding to instances in X_train. cost_mat : list[list] Cost matrix corresponding to instances in X_train. Returns ------- tuple - **X_train** : list[list], Updated total pairwise runtime output data gathered so far. - **y** : list[list], 0 signififying Configuration_0 is better, 1 signifying Configuration_1 is better within the pairwise comparison corresponding to instances in updated X_train. - **cost_mat** : list[list], Cost matrix corresponding to instances in updated X_train. """ max_res = max(res) disregard = [] for i, (c, X) in enumerate(zip(cores, instances)): if winner in (c[0], c[1]) or ( res[c[0]] != max_res and res[c[1]] != max_res): if c[0] == winner: y.append(0) # 0 -> i wins elif c[1] == winner: y.append(1) # 1 -> j wins elif res[c[0]] != max_res and res[c[1]] != max_res: if res[c[0]] < res[c[1]]: y.append(0) else: y.append(1) # Avoid invalid value in scalar divide (1.0 - cost / cost_base) # in costcla==0.6 by using epsilon instead of 0 epsilon = 0.000001 cost_mat.append( [max([epsilon, (res[c[0]] - res[c[1]]) / max_res]), max([epsilon, (res[c[1]] - res[c[0]]) / max_res]), 0, 0]) else: disregard.append(i) # Delete instances that compare configurations that both timed out for i in reversed(disregard): del data[i] X_train.extend(data) return X_train, y, cost_mat
[docs] def train_gb( self, X_train: list[list], y_train: list[int], cost_mat_train: list[list], cores: list[list] ) -> CostSensitiveRandomPatchesClassifier | None: """ Train gray-box model. Parameters ---------- X_train : list[list] Total pairwise runtime output data gathered so far. y_train : list[int] 0 signififying Configuration_0 is better, 1 signifying Configuration_1 is better within the pairwise comparison corresponding to instances in X_train. cost_mat_train : list[list] Cost matrix corresponding to instances in X_train. cores : list[list], Core pairs corresponding to the Configurations compared in X_train. Returns ------- CostSensitiveRandomPatchesClassifier or None Trained gray-box model or None, if not sufficient data for training. """ if len(X_train) > 2: with threadpool_limits(limits=cores): X_train, y_train, cost_mat_train = \ np.array(X_train), np.array(y_train), \ np.array(cost_mat_train) self.model = CostSensitiveRandomPatchesClassifier( combination='weighted_voting', max_samples=0.25, n_estimators=20 ).fit(X_train, y_train.ravel(), cost_mat_train) return self.model else: return None
[docs] def classify_configs(self, X_pred: list[list], cores: list[list], model: CostSensitiveRandomPatchesClassifier) -> list: """ Classify configurations in the current tournament based o current pairwise comparison features. Parameters ---------- X_pred : list[list] Pairwise comparison instances from most recent outputs. cores : list[list] Core number pairs corresponding to X_pred entries. model : CostSensitiveRandomPatchesClassifier Trained gray-box model. Returns ------- list 0 if C_0 is predicted to be better, 1 if otherwise, corresponding to cores. """ with threadpool_limits(limits=cores): predictions = None # self.model = model if not isinstance(X_pred, np.ndarray): X_pred = np.array(X_pred) if len(X_pred) > 1: predictions = model.predict(X_pred) return predictions
[docs] def term_list(self, pred: list, cores: list[list], verbosity: int) -> list: """ Process predictions and decide if and which Configuration runs should be terminated. Parameters ---------- pred : list 0 if C_0 is predicted to be better, 1 if otherwise, corresponding to cores. cores : list[list] Core number pairs. verbosity : int Hyperparameter turning on and off additional terminal outputs. Returns ------- list Core indices of Configuration runs to be terminated. """ better_counts = Counter() for i, (a, b) in enumerate(cores): if pred[i] == 0: better_counts[a] += 1 else: better_counts[b] += 1 # Ensure all cores are represented (even with 0 count) all_cores = set(core for pair in cores for core in pair) for core in all_cores: better_counts.setdefault(core, 0) # Convert to sorted list of (core, count) sorted_counts = \ sorted(better_counts.items(), key=lambda x: x[1], reverse=True) # Assign ranks (higher count = better rank = lower number) ranks = {core: rank for rank, (core, _) in enumerate(sorted_counts)} # Compute average of best and worst ranks max_rank = max(ranks.values()) min_rank = min(ranks.values()) avg_rank = (max_rank + min_rank) / 2 # Select cores better than avg_rank worse_than_avg = \ [core for core, rank in ranks.items() if rank >= avg_rank] if len(set(better_counts.values())) == 1: return [] else: if verbosity == 2: print('\n\n') print('GRAY BOX PAIRWSE COMPARISON STATS:') print("Better counts:", dict(better_counts)) print("Ranks:", ranks) print("Average rank:", avg_rank) print("Worse than average:", worse_than_avg) return worse_than_avg
if __name__ == '__main__': pass