import copy
import math
import os
import numpy as np
import pandas as pd
import scipy.stats
from scipy.stats import binom
from .c_sub_region import cSubRegion
"""
This function uniformly sampling i_n_samp sample points with i_n_rep replication in the subregions c_subregion and generate the df that used to sent to calibtool
input:
i_n_samp: # sampling needed in the subregion
i_n_rep: # replication needed in the subregion
c_subr:examing subregion
outout
l_subr
df_testing_samples
"""
# TODO: plotter that can choose any two dimension and fix the value of other dimensions
[docs]def fun_sample_points_generator_deterministic(l_subr, i_n_sampling, i_n_rep, s_stage, l_para):
l_column = ['l_coordinate_lower', 'l_coordinate_upper'] + l_para + ['replication']
df_testing_samples = pd.DataFrame([],
columns=l_column) # the dataframe contains the sampling point sent to calibtool
df_testing_samples['replication'].astype(int)
l_sampling_subregions = []
if s_stage == 'stage_1':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True:
l_sampling_subregions.append(c_subr)
elif s_stage == 'stage_2':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and len(c_subr.pd_sample_record) > 0:
l_sampling_subregions.append(c_subr)
elif s_stage == 'stage_4-1':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and (c_subr.b_worst is True or c_subr.b_elite is True):
l_sampling_subregions.append(c_subr)
elif s_stage == 'stage_4-2':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and (c_subr.b_worst is True or c_subr.b_elite is True):
l_sampling_subregions.append(c_subr)
if not l_sampling_subregions:
return [l_subr, pd.DataFrame()]
for c_subr in l_sampling_subregions:
c_subr.pd_sample_record = c_subr.pd_sample_record.sort_values(by="mean",
ascending=True) # sort before start
c_subr.pd_sample_record = c_subr.pd_sample_record.reset_index(drop=True) # reindex before start
if len(c_subr.pd_sample_record) >= i_n_sampling: # if has enough number of sampling points
for i in (i for i in range(0, len(c_subr.pd_sample_record)) if
c_subr.pd_sample_record.loc[i, 'rep'] < i_n_rep): # check enough # reps or not, i is index
# df_testing_samples.append(pd.DataFrame([[c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p] for p in l_para] + [i_n_rep - int(c_subr.pd_sample_record.loc[i, '# rep'])]], columns=l_column))
l_vals = [c_subr.l_coordinate_lower] + \
[c_subr.l_coordinate_upper] + \
[c_subr.pd_sample_record.loc[i, p] for p in l_para] + \
[i_n_rep - int(c_subr.pd_sample_record.loc[i, 'rep'])]
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
else: # if has not enough sampling points and replication
if len(c_subr.pd_sample_record) >= 1: # if already has sample points, first deal with them
for i in (
i for i in range(0, len(c_subr.pd_sample_record))
if c_subr.pd_sample_record.loc[i, 'rep'] < i_n_rep
): # check enough # reps or not for existing old sampling points
# df_testing_samples.append(pd.DataFrame([[c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p] for p in l_para] + [i_n_rep - int(c_subr.pd_sample_record.loc[i, '# rep'])]], columns=l_column))
l_vals = [c_subr.l_coordinate_lower] + \
[c_subr.l_coordinate_upper] + \
[c_subr.pd_sample_record.loc[i, p] for p in l_para] + \
[i_n_rep - int(c_subr.pd_sample_record.loc[i, 'rep'])]
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
i_ini_length = len(c_subr.pd_sample_record) # number of sampling point so far in this subregion
for i in range(i_ini_length, i_n_sampling): # create new rows for new sampling points
c_subr.pd_sample_record.loc[i] = [1 for n in range(len(c_subr.pd_sample_record.columns))]
c_subr.pd_sample_record.loc[i, 'rep'] = 0
index = [x for x in range(i_ini_length, i_n_sampling)]
for i in range(0, len(l_para)): # create new sampling point and add to dataframe
a_new_sample = np.random.uniform(low=c_subr.l_coordinate_lower[i],
high=c_subr.l_coordinate_upper[i],
size=i_n_sampling - i_ini_length) # generate only for one dim
c_subr.pd_sample_record.loc[i_ini_length:i_n_sampling - 1, l_para[i]] = pd.Series(
a_new_sample.tolist(), index)
c_subr.pd_sample_record.loc[index, l_para[i]] = pd.Series(a_new_sample.tolist(), index)
c_subr.pd_sample_record.loc[index, 'mean'] = 0
c_subr.pd_sample_record.loc[index, 'var'] = 0
c_subr.pd_sample_record.loc[index, 'SST'] = 0
for i in range(i_ini_length, i_n_sampling): # put the new generate sample points in df_samples
l_vals = [c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p]
for p in l_para] + [i_n_rep]
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
return [l_subr, df_testing_samples]
[docs]def fun_sample_points_generator_noise(l_subr, i_n_sampling, i_n_rep, s_stage, l_para):
l_column = ['l_coordinate_lower', 'l_coordinate_upper'] + l_para
df_testing_samples = pd.DataFrame([], columns=l_column)
if s_stage == 'stage_1':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True:
l_sampling_subregions.append(c_subr)
elif s_stage == 'stage_2':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and len(c_subr.pd_sample_record) > 0:
l_sampling_subregions.append(c_subr)
elif s_stage == 'stage_4-1':
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and (c_subr.b_worst is True or c_subr.b_elite is True):
l_sampling_subregions.append(c_subr)
else:
l_sampling_subregions = []
for c_subr in l_subr:
if c_subr.s_label == 'C' and c_subr.b_activate is True and (c_subr.b_worst is True or c_subr.b_elite is True):
l_sampling_subregions.append(c_subr)
for c_subr in l_sampling_subregions:
c_subr.pd_sample_record = c_subr.pd_sample_record.sort_values(by="mean",
ascending=True) # sort before start
c_subr.pd_sample_record = c_subr.pd_sample_record.reset_index(drop=True) # reindex before start
if len(c_subr.pd_sample_record) >= i_n_sampling: # if has enough number of sampling points
for i in (
i for i in range(0, len(c_subr.pd_sample_record)) if c_subr.pd_sample_record.loc[i, 'rep'] < i_n_rep
): # check enough # reps or not, i is index
# df_testing_samples.append(pd.DataFrame([[c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p] for p in l_para] + [i_n_rep - int(c_subr.pd_sample_record.loc[i, '# rep'])]], columns=l_column))
l_vals = [c_subr.l_coordinate_lower] + \
[c_subr.l_coordinate_upper] + \
[c_subr.pd_sample_record.loc[i, p] for p in l_para]
for j in range(0, i_n_rep - int(c_subr.pd_sample_record.loc[i, 'rep'])):
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
else: # if has not enough sampling points and replication
if len(c_subr.pd_sample_record) >= 1: # if already has sample points, first deal with them
for i in (
i for i in range(0, len(c_subr.pd_sample_record)) if c_subr.pd_sample_record.loc[i, 'rep'] < i_n_rep
): # check enough # reps or not for existing old sampling points
# df_testing_samples.append(pd.DataFrame([[c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p] for p in l_para] + [i_n_rep - int(c_subr.pd_sample_record.loc[i, '# rep'])]], columns=l_column))
l_vals = [c_subr.l_coordinate_lower] + \
[c_subr.l_coordinate_upper] + \
[c_subr.pd_sample_record.loc[i, p] for p in l_para]
for j in range(0, [i_n_rep - int(c_subr.pd_sample_record.loc[i, 'rep'])]):
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
i_ini_length = len(c_subr.pd_sample_record) # number of sampling point so far in this subregion
for i in range(i_ini_length, i_n_sampling): # create new rows for new sampling points
c_subr.pd_sample_record.loc[i] = [1 for n in range(len(c_subr.pd_sample_record.columns))]
index = [x for x in range(i_ini_length, i_n_sampling)]
for i in range(0, len(l_para)): # create new sampling point and add to dataframe
a_new_sample = np.random.uniform(low=c_subr.l_coordinate_lower[i],
high=c_subr.l_coordinate_upper[i],
size=i_n_sampling - i_ini_length) # generate only for one dim
c_subr.pd_sample_record.loc[i_ini_length:i_n_sampling - 1, l_para[i]] = pd.Series(
a_new_sample.tolist(), index)
c_subr.pd_sample_record.loc[index, l_para[i]] = pd.Series(a_new_sample.tolist(), index)
c_subr.pd_sample_record.loc[index, 'mean'] = 0
c_subr.pd_sample_record.loc[index, 'var'] = 0
c_subr.pd_sample_record.loc[index, 'SST'] = 0
c_subr.pd_sample_record.loc[index, 'rep'] = 0
for i in range(i_ini_length, i_n_sampling): # put the new generate sample points in df_samples
l_vals = [c_subr.l_coordinate_lower] + [c_subr.l_coordinate_upper] + [c_subr.pd_sample_record.loc[i, p]
for p in l_para] + [i_n_rep]
for j in range(0, i_n_rep):
df_testing_samples = df_testing_samples.append(dict(zip(l_column, l_vals)), ignore_index=True)
return [l_subr, df_testing_samples]
[docs]def turn_to_power(list, power):
return [number ** power for number in list]
[docs]def fun_results_organizer_deterministic(l_subr, df_testing_samples, params):
# df_testing_samples: ['l_coordinate_lower', 'l_coordinate_upper'] + l_para + ['replication'] + ['result']
df_testing_samples = df_testing_samples.reset_index(drop=True)
for c_subr in [c_subr for c_subr in l_subr if c_subr.s_label == 'C' and c_subr.b_activate is True]:
df_testing_samples_s_subr = pd.DataFrame([],
columns=[p['Name'] for p in params] + ['rep'] + ['mean'] + ['var'] + [
'SST'])
df_testing_samples_s_subr['rep'].astype(int)
df_testing_samples_s_subr['mean'].astype(float)
df_testing_samples_s_subr['var'].astype(float)
df_testing_samples_s_subr['SST'].astype(float)
c_subr.pd_sample_record.drop(c_subr.pd_sample_record[c_subr.pd_sample_record.rep == 0].index, inplace=True)
df_testing_samples['l_coordinate_lower'] = df_testing_samples['l_coordinate_lower'].astype(str)
df_testing_samples['l_coordinate_upper'] = df_testing_samples['l_coordinate_upper'].astype(str)
df_testing_samples_s_subr[[p['Name'] for p in params] + ['mean']] = \
df_testing_samples[
(df_testing_samples['l_coordinate_lower'] == str(c_subr.l_coordinate_lower)) & # noqa: W504
(df_testing_samples['l_coordinate_upper'] == str(c_subr.l_coordinate_upper))][[p['Name'] for p in params] + ['result']]
df_testing_samples_s_subr = df_testing_samples_s_subr.reset_index(drop=True)
if len(df_testing_samples_s_subr) > 0:
for i in range(0, len(df_testing_samples_s_subr)):
df_testing_samples_s_subr.loc[i, 'mean'] = df_testing_samples_s_subr.loc[i, 'mean'][0]
df_testing_samples_s_subr['rep'] = 1
df_testing_samples_s_subr['var'] = 0
df_testing_samples_s_subr['SST'] = df_testing_samples_s_subr.apply(lambda row: (row['mean'] * row['mean']),
axis=1)
c_subr.pd_sample_record = pd.concat([c_subr.pd_sample_record, df_testing_samples_s_subr])
# the following update i_min_sample, i_max_sample, f_min_diff_sample_mean, and f_max_var
c_subr.pd_sample_record = c_subr.pd_sample_record.sort_values(by="mean", ascending=True)
c_subr.pd_sample_record = c_subr.pd_sample_record.reset_index(drop=True) # reindex the sorted df
if len(c_subr.pd_sample_record) > 0:
c_subr.i_min_sample = c_subr.pd_sample_record.loc[0, 'mean']
c_subr.i_max_sample = c_subr.pd_sample_record.loc[len(c_subr.pd_sample_record) - 1, 'mean']
c_subr.f_min_diff_sample_mean = min(
c_subr.pd_sample_record['mean'].shift(-1) - c_subr.pd_sample_record['mean'])
c_subr.f_max_var = max(c_subr.pd_sample_record.loc[:, 'var'])
return l_subr
[docs]def fun_results_organizer_noise(l_subr, df_testing_samples, i_n_k, i_n_elite_worst, s_stage, params):
l_params = [p['Name'] for p in params]
if s_stage in ['stage_1', 'stage_2', 'stage_4-1']:
i_n = i_n_k
else:
i_n = i_n_elite_worst
df_testing_samples = df_testing_samples.reset_index(drop=True)
df_testing_samples['square_result'] = np.power(df_testing_samples['result'], 2)
df_testing_samples_grouped = df_testing_samples.groupby(l_params).agg(
{'result': [np.mean, np.var, np.sum, 'count'], 'square_result': 'sum'}).reset_index()
df_testing_samples_grouped.columns = df_testing_samples_grouped.columns.droplevel(level=0)
df_testing_samples_grouped.columns = l_params + ['new_data_mean', 'new_data_var', 'new_data_sum', 'new_data_rep',
'new_data_SST']
for c_subr in [c_subr for c_subr in l_subr if c_subr.s_label == 'C' and c_subr.b_activate is True]:
c_subr.pd_sample_record = pd.merge(c_subr.pd_sample_record, df_testing_samples_grouped, on=l_params)
c_subr.pd_sample_record['new_data_mean'].astype(float)
c_subr.pd_sample_record['new_data_var'].astype(float)
c_subr.pd_sample_record['new_data_sum'].astype(float)
c_subr.pd_sample_record['new_data_rep'].astype(int)
c_subr.pd_sample_record['new_data_SST'].astype(float)
c_subr.pd_sample_record['mean'] = copy.copy(c_subr.pd_sample_record.apply(
lambda row: float(row['rep'] * row['mean'] + row['new_data_rep'] * row['new_data_mean']) / i_n, axis=1))
c_subr.pd_sample_record['SST'] = copy.copy(
c_subr.pd_sample_record.apply(lambda row: row['SST'] + row['new_data_SST'], axis=1))
c_subr.pd_sample_record['var'] = copy.copy(
c_subr.pd_sample_record.apply(lambda row: float(row['SST'] - i_n * pow(row['mean'], 2)) / (i_n - 1),
axis=1))
c_subr.pd_sample_record['rep'] = copy.copy(i_n)
c_subr.pd_sample_record.drop(['new_data_mean', 'new_data_var', 'new_data_sum', 'new_data_rep', 'new_data_SST'],
inplace=True, axis=1)
'''
for c_subr_data_index in range(0, len(c_subr.pd_sample_record)):
if c_subr.pd_sample_record.loc[c_subr_data_index, 'rep'] == 0:
c_subr.pd_sample_record.loc[c_subr_data_index, 'mean'] = copy.copy(df_testing_samples_grouped_mean.loc[(df_testing_samples_grouped_mean[p['Name']] == c_subr.pd_sample_record[p['Name']] for p in params), 'result'])
c_subr.pd_sample_record.loc[c_subr_data_index, 'SST'] = copy.copy(df_testing_samples_grouped_SST.loc[[df_testing_samples_grouped_SST[p['Name']] for p in params] == [c_subr.pd_sample_record[p['Name']] for p in params], 'result'])
c_subr.pd_sample_record.loc[c_subr_data_index, 'var'] = copy.copy(df_testing_samples_grouped_var.loc[[df_testing_samples_grouped_var[p['Name']] for p in params] == [c_subr.pd_sample_record[p['Name']] for p in params], 'result'])
c_subr.pd_sample_record.loc[c_subr_data_index, 'rep'] = i_n
else:
c_subr.pd_sample_record.loc[c_subr_data_index, 'mean'] = copy.copy(float(
int(c_subr.pd_sample_record.loc[c_subr_data_index, 'rep']) * c_subr.pd_sample_record.loc[c_subr_data_index, 'mean'] + df_testing_samples_grouped_var.loc[[df_testing_samples_grouped_sum[p['Name']] for p in params] == [c_subr.pd_sample_record[p['Name']] for p in params], 'result']) / i_n)
c_subr.pd_sample_record.loc[c_subr_data_index, 'SST'] = copy.copy(c_subr.pd_sample_record.loc[c_subr_data_index, 'SST'] + df_testing_samples_grouped_SST.loc[[df_testing_samples_grouped_SST[p['Name']] for p in params] == [c_subr.pd_sample_record[p['Name']] for p in params], 'result'])
c_subr.pd_sample_record.loc[c_subr_data_index, 'var'] = copy.copy(float(c_subr.pd_sample_record.loc[c_subr_data_index, 'SST'] - i_n * pow(c_subr.pd_sample_record.loc[c_subr_data_index, 'mean'], 2)) / (i_n - 1))
c_subr.pd_sample_record.loc[c_subr_data_index, 'rep'] = i_n
'''
return l_subr
"""
This function orders all the sampling points in all the undetermined regions
input:
one subregions
output:
c_subregion:subregion with updated dataframe of descending order data
"""
[docs]def fun_order_subregion(c_subr):
c_subr.pd_sample_record = c_subr.pd_sample_record.sort_values(by="mean", ascending=True)
c_subr.pd_sample_record = c_subr.pd_sample_record.reset_index(drop=True) # reindex the sorted df
if len(c_subr.pd_sample_record) > 0:
c_subr.i_min_sample = c_subr.pd_sample_record.loc[0, 'mean']
c_subr.i_max_sample = c_subr.pd_sample_record.loc[len(c_subr.pd_sample_record) - 1, 'mean']
c_subr.f_min_diff_sample_mean = min(c_subr.pd_sample_record['mean'].shift(-1) - c_subr.pd_sample_record['mean'])
c_subr.f_max_var = max(c_subr.pd_sample_record.loc[:, 'var'])
return c_subr
"""
f_update_replication function is aim to calculate the updated replication number
input:
l_subr:list of all examning subregions
i_n_rep:original replication
f_alpha
outout
i_replication:update replication
"""
[docs]def fun_replication_update(l_subr, i_n_rep, f_alpha):
if list(i.f_min_diff_sample_mean for i in l_subr if
i.s_label == 'C' and i.b_activate is True) + [] == []: # to prevent empty sequence
f_d_star = 0.005
elif min(i.f_min_diff_sample_mean for i in l_subr if i.s_label == 'C' and i.b_activate is True) < 0.005:
f_d_star = 0.005
else:
f_d_star = min(i.f_min_diff_sample_mean for i in l_subr if i.s_label == 'C' and i.b_activate is True)
f_var_star = max(i.f_max_var for i in l_subr if i.s_label == 'C' and i.b_activate is True)
z = scipy.stats.norm.ppf(1 - f_alpha / 2)
# to prevent the float NaN
if math.isnan(z) is True or math.isnan(f_d_star) is True or math.isnan(f_var_star) is True:
i_n_rep = i_n_rep
else:
i_n_rep = max(i_n_rep, 4 * int(math.ceil(pow(z, 2) * f_var_star / pow(f_d_star, 2))))
return i_n_rep
"""
This function orders all the sampling points in all the undetermined regions
input:
l_subr:list of all subregions
output:
pd_order_z:dataframe of descending order data
"""
[docs]def fun_order_region(l_subr):
l_all_sample_C = []
for i in (i for i in l_subr if i.s_label == 'C'):
l_all_sample_C.append(i.pd_sample_record)
pd_order_z = pd.concat(l_all_sample_C)
pd_order_z = pd_order_z.sort_values(by="mean", ascending=True)
pd_order_z = pd_order_z.reset_index(drop=True) # reindex the sorted df
return pd_order_z
"""
input:
f_CI_u:upper bound confidence interval
c_subr: examining subregion
output:
update subregions
"""
[docs]def fun_pruning_indicator(l_subr, f_CI_u):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.s_label == 'C' and c_subr.b_activate is True and c_subr.b_worst is True):
if c_subr.i_min_sample > f_CI_u:
c_subr.b_maintaining_indicator = False
c_subr.b_pruning_indicator = True
return l_subr
"""
This function create the list of subregions prepared to maintain from the list of elite subregions
nput:
f_CI_l:lower bound confidence interval
c_subr: examining subregion
output:
update subregions
"""
[docs]def fun_maintaining_indicator(l_subr, f_CI_l):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.s_label == 'C' and c_subr.b_activate is True and c_subr.b_elite is True):
if c_subr.i_max_sample < f_CI_l:
c_subr.b_maintaining_indicator = True
c_subr.b_pruning_indicator = False
return l_subr
"""
This function create the list of worst function used in the step 4
nput:
f_CI_l:lower bound confidence interval
c_subregions:examining subregion
output:
list 0f update subregions
"""
[docs]def fun_elite_indicator(l_subr, f_CI_l):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.s_label == 'C' and c_subr.b_activate is True and c_subr.i_max_sample < f_CI_l):
c_subr.b_elite = True
c_subr.b_worst = False
return l_subr
"""
This function create the list of worst function used in the step 4
nput:
f_CI_u:upper bound confidence interval
c_subregions:examining subregion
output:
list 0f updated subregions
"""
[docs]def fun_worst_indicator(l_subr, f_CI_u):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.s_label == 'C' and c_subr.b_activate is True and c_subr.i_min_sample > f_CI_u):
c_subr.b_elite = False
c_subr.b_worst = True
return l_subr
"""
This function update the quantile
input:
l_subr: list of subregions
f_delta
output:
f_delta:updated quantile
"""
[docs]def fun_quantile_update(l_subr, f_delta):
f_vol_C = sum(c.f_volume for c in l_subr if c.s_label == 'C' and c.b_activate is True)
f_vol_pruning = sum(c.f_volume for c in l_subr if c.b_pruning_indicator is True and c.b_activate is True)
f_vol_maintaining = sum(c.f_volume for c in l_subr if c.b_maintaining_indicator is True and c.b_activate is True)
f_delta = float(f_delta * f_vol_C - f_vol_maintaining) / (f_vol_C - f_vol_pruning - f_vol_maintaining)
return f_delta
"""
input:
l_subergion:examine subregions
f_delta
f_epsilon
f_vol_S:volume of all regions
f_vol_C:volume of set of all undetermined regions
f_vol_P:volume of set of all prune regions
f_vol_M:volume of set of all maintain regions
outout
CI_l:lower bound of confident interval
CI_u:upper bound of confident interval
"""
[docs]def fun_ci_builder(l_subr, pd_order_z, f_delta_k, f_alpha_k, f_epsilon):
f_vol_s = l_subr[0].f_volume
f_vol_c = sum(c.f_volume for c in l_subr if c.s_label == 'C' and c.b_activate is True)
f_vol_p = sum(c.f_volume for c in l_subr if c.s_label == 'P' and c.b_activate is True)
f_vol_m = sum(c.f_volume for c in l_subr if c.s_label == 'M' and c.b_activate is True)
f_delta_kl = f_delta_k - float(f_vol_p * f_epsilon) / (f_vol_s * f_vol_c)
f_delta_ku = f_delta_k + float(f_vol_m * f_epsilon) / (f_vol_s * f_vol_c)
f_max_r = binom.ppf(f_alpha_k / 2, len(pd_order_z), f_delta_kl)
f_min_s = binom.ppf(1 - f_alpha_k / 2, len(pd_order_z), f_delta_ku)
if math.isnan(f_max_r) is True:
f_max_r = 0
ci_l = pd_order_z.loc[f_max_r, 'mean']
ci_u = pd_order_z.loc[f_min_s, 'mean']
return [ci_u, ci_l]
[docs]def fun_pruning_labeler(l_subr):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.b_pruning_indicator is True and c_subr.b_activate is True): # <-- whose worst == 1
c_subr.s_label = 'P'
return l_subr
[docs]def fun_maintaining_labeler(l_subr):
for c_subr in (c_subr for c_subr in l_subr if
c_subr.b_maintaining_indicator is True and c_subr.b_activate is True): # <-- whose elite == 1
c_subr.s_label = 'M'
return l_subr
"""
input:
c_subr: examining subregion
output:
l_subr: list of branching B subregions
"""
[docs]def fun_reg_branching(c_subr, i_n_branching, params, s_branching_dim):
i_max_index = [p['Name'] for p in params].index(s_branching_dim)
l_subr_new = []
# the following creates B subregions in the list of subregions
for i in range(0, i_n_branching):
l_coordinate_lower = copy.deepcopy(c_subr.l_coordinate_lower)
l_coordinate_upper = copy.deepcopy(c_subr.l_coordinate_upper)
l_coordinate_lower[i_max_index] = float(
(c_subr.l_coordinate_upper[i_max_index] - c_subr.l_coordinate_lower[i_max_index]) * i
) / i_n_branching + c_subr.l_coordinate_lower[i_max_index]
l_coordinate_upper[i_max_index] = float(
(c_subr.l_coordinate_upper[i_max_index] - c_subr.l_coordinate_lower[i_max_index]) * (i + 1)
) / i_n_branching + c_subr.l_coordinate_lower[i_max_index]
l_new_branching_subr = cSubRegion(l_coordinate_lower, l_coordinate_upper, params)
l_subr_new.append(l_new_branching_subr)
# the following reallocate the sampling points
for i in l_subr_new:
i.pd_sample_record = c_subr.pd_sample_record[
(c_subr.pd_sample_record[s_branching_dim] > i.l_coordinate_lower[i_max_index]) & # noqa: W504
(c_subr.pd_sample_record[s_branching_dim] < i.l_coordinate_upper[i_max_index])]
for i in l_subr_new: # reindex the sampling points into 0 1 2...
i.pd_sample_record = i.pd_sample_record.reset_index(drop=True)
# update attributed based on data
if len(i.pd_sample_record) > 0:
i.i_min_sample = min(i.pd_sample_record.loc[:, 'mean'])
i.i_max_sample = max(i.pd_sample_record.loc[:, 'mean'])
i.f_min_diff_sample_mean = min(i.pd_sample_record['mean'].shift(-1) - i.pd_sample_record['mean'])
if len(i.pd_sample_record) > 1:
i.f_max_var = max(i.pd_sample_record.loc[:, 'var'])
return l_subr_new
[docs]def fun_plot2D(l_subr, l_initial_coordinate_lower, l_initial_coordinate_upper, params, str_k, s_running_file_name,
i_iteration):
import matplotlib.patches as patches
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.plot(l_initial_coordinate_upper[0], l_initial_coordinate_upper[1])
ax.plot(l_initial_coordinate_lower[0], l_initial_coordinate_lower[1])
for c_subr in (c_subr for c_subr in l_subr if c_subr.b_activate is True):
if c_subr.s_label == 'M':
alpha_value = 1
elif c_subr.s_label == 'P':
alpha_value = 0.1
else: # i.s_label == 'C':
alpha_value = 0.6
ax.add_patch(
patches.Rectangle(
(c_subr.l_coordinate_lower[0], c_subr.l_coordinate_lower[1]), # (x,y)
c_subr.l_coordinate_upper[0] - c_subr.l_coordinate_lower[0], # width
c_subr.l_coordinate_upper[1] - c_subr.l_coordinate_lower[1], # height
alpha=alpha_value,
edgecolor="black"
)
)
# the following plot the minimum and maximum point
df_all_sample = pd.concat([c_subr.pd_sample_record for c_subr in l_subr if c_subr.b_activate is True])
df_all_sample = df_all_sample.sort_values(by="mean", ascending=True) # sort before start
df_all_sample = df_all_sample.reset_index(drop=True)
f_min_value = df_all_sample.loc[0, 'mean']
f_max_value = df_all_sample.loc[len(df_all_sample) - 1, 'mean']
l_min_coordinate = [df_all_sample.loc[0, p['Name']] for p in params]
l_max_coordinate = [df_all_sample.loc[len(df_all_sample) - 1, p['Name']] for p in params]
p_min, p_max = ax.plot(l_min_coordinate[0], l_min_coordinate[1], '*b', l_max_coordinate[0], l_max_coordinate[1],
'or')
fig.legend((p_min, p_max), (
'minimum point:[' + str(l_min_coordinate[0]) + ',' + str(l_min_coordinate[1]) + '], result:' + str(f_min_value),
'maximum point:[' + str(l_max_coordinate[0]) + ',' + str(l_max_coordinate[1]) + '], result:' + str(
f_max_value)), 'upper right')
for c_subr in (c_subr for c_subr in l_subr if c_subr.b_activate is True):
ax.text(float(c_subr.l_coordinate_lower[0] + c_subr.l_coordinate_upper[0]) / 2,
float(c_subr.l_coordinate_lower[1] + c_subr.l_coordinate_upper[1]) / 2,
str(c_subr.pd_sample_record['rep'].sum()))
# plt.legend(handles=[red_patch, blue_patch])
i_total_simulation = 0
for c_subr in (c_subr for c_subr in l_subr if c_subr.b_activate is True):
i_total_simulation += (c_subr.pd_sample_record['rep'].sum())
fig.text(0.02, 0.02, 'total number of simulation:' + str(i_total_simulation), fontsize=14)
ax.set_xlabel([p['Name'] for p in params][0])
ax.set_ylabel([p['Name'] for p in params][1])
# make sure file directory exists
fig_file = s_running_file_name + '/iter' + str(i_iteration) + '/Region_Status ' + str(str_k) + '.pdf'
d_file = os.path.dirname(fig_file)
if not os.path.exists(d_file):
os.makedirs(d_file)
fig.savefig(fig_file)
# with open('l_subr_all_simulations_iteration' + str(str_k) + '.dat', "wb") as f:
# pickle.dump(l_subr, f)
plt.close(fig)