提交 97f8a043 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!159 implement weighted ensemble feature selection with multiprocessing to get real speedup

Merge pull request !159 from bryantclc/master
...@@ -25,22 +25,24 @@ from sklearn.ensemble import BaggingRegressor, AdaBoostRegressor ...@@ -25,22 +25,24 @@ from sklearn.ensemble import BaggingRegressor, AdaBoostRegressor
from sklearn.tree import DecisionTreeRegressor from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import ElasticNet, Ridge from sklearn.linear_model import ElasticNet, Ridge
from sklearn.tree import ExtraTreeRegressor from sklearn.tree import ExtraTreeRegressor
import threading
import multiprocessing
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class FeatureSelectorThread(threading.Thread): class FeatureSelectorProcess(multiprocessing.Process):
"""class feature selector each with threading""" """class feature selector each with multiprocessing"""
def __init__(self, regressor, list_sample_x, list_sample_y, labels, index): def __init__(self, regressor, list_sample_x, list_sample_y, labels, index, sorted_index_queue, prediction_queue):
threading.Thread.__init__(self) multiprocessing.Process.__init__(self)
self._regressor = regressor self._regressor = regressor
self._list_sample_x = list_sample_x self._list_sample_x = list_sample_x
self._list_sample_y = list_sample_y self._list_sample_y = list_sample_y
self._labels = labels self._labels = labels
self._index = index self._index = index
self._sorted_index = [] self._sorted_index = []
self._sorted_index_queue = sorted_index_queue
self._prediction_queue = prediction_queue
def get_unified_feature_importance(self, regressor): def get_unified_feature_importance(self, regressor):
"""get unified feature importance for different type regressor""" """get unified feature importance for different type regressor"""
...@@ -56,6 +58,12 @@ class FeatureSelectorThread(threading.Thread): ...@@ -56,6 +58,12 @@ class FeatureSelectorThread(threading.Thread):
return feature_importances return feature_importances
return None return None
def get_ensemble_train_data(self):
"""get ensemble train data"""
regressor = self._regressor
prediction = regressor.predict(self._list_sample_x)
return prediction
def run(self): def run(self):
"""main method to train the model and get ranked feature importance""" """main method to train the model and get ranked feature importance"""
self._regressor.fit(self._list_sample_x, self._list_sample_y) self._regressor.fit(self._list_sample_x, self._list_sample_y)
...@@ -63,16 +71,12 @@ class FeatureSelectorThread(threading.Thread): ...@@ -63,16 +71,12 @@ class FeatureSelectorThread(threading.Thread):
result = zip(unified_feature_importance, self._labels, self._index) result = zip(unified_feature_importance, self._labels, self._index)
result = sorted(result, key=lambda x: -x[0]) result = sorted(result, key=lambda x: -x[0])
self._sorted_index = [i for coef, label, i in result] self._sorted_index = [i for coef, label, i in result]
self._sorted_index_queue.put(self._sorted_index)
def get_sorted_index(self): prediction = self.get_ensemble_train_data()
"""get sorted feature importance index""" self._prediction_queue.put(prediction)
try:
return self._sorted_index
except Exception:
return None
class WeightedEnsembleFeatureSelector: class WeightedEnsembleFeatureSelector(object):
"""class weighted ensemble feature selector""" """class weighted ensemble feature selector"""
def __init__(self): def __init__(self):
...@@ -92,23 +96,36 @@ class WeightedEnsembleFeatureSelector: ...@@ -92,23 +96,36 @@ class WeightedEnsembleFeatureSelector:
"""get native feature importances in parallel with multiple threading""" """get native feature importances in parallel with multiple threading"""
native_feature_importances = [] native_feature_importances = []
fs_thread_list = [] fs_thread_list = []
sorted_index_queue_list = []
prediction_queue_list = []
for regressor in self._regressors: for regressor in self._regressors:
fs_thread = FeatureSelectorThread(regressor, list_sample_x, list_sample_y, labels, index) sorted_index_queue = multiprocessing.Queue()
prediction_queue = multiprocessing.Queue()
fs_thread = FeatureSelectorProcess(regressor, list_sample_x, list_sample_y,
labels, index, sorted_index_queue, prediction_queue)
fs_thread_list.append(fs_thread) fs_thread_list.append(fs_thread)
sorted_index_queue_list.append(sorted_index_queue)
prediction_queue_list.append(prediction_queue)
fs_thread.start() fs_thread.start()
for fs_thread in fs_thread_list: for fs_thread in fs_thread_list:
fs_thread.join() fs_thread.join()
for fs_thread in fs_thread_list:
native_fi = fs_thread.get_sorted_index() for sorted_index_queue in sorted_index_queue_list:
native_fi = sorted_index_queue.get()
native_feature_importances.append(native_fi) native_feature_importances.append(native_fi)
return native_feature_importances LOGGER.info('get sorted index queue list')
def get_ensemble_train_datas(self, list_sample_x):
"""get ensemble train datas"""
predictions = [] predictions = []
for regressor in self._regressors: for prediction_queue in prediction_queue_list:
prediction = regressor.predict(list_sample_x) prediction = prediction_queue.get()
predictions.append(prediction) predictions.append(prediction)
LOGGER.info('get prediction queue list')
return native_feature_importances, predictions
def get_ensemble_train_datas(self, list_sample_x, predictions):
"""get ensemble train datas"""
train_datas = [] train_datas = []
for i in range(len(list_sample_x)): for i in range(len(list_sample_x)):
train_data = [] train_data = []
...@@ -117,9 +134,9 @@ class WeightedEnsembleFeatureSelector: ...@@ -117,9 +134,9 @@ class WeightedEnsembleFeatureSelector:
train_datas.append(train_data) train_datas.append(train_data)
return train_datas return train_datas
def get_ensemble_weights(self, list_sample_x, list_sample_y): def get_ensemble_weights(self, list_sample_x, list_sample_y, predictions):
"""get ensemble weights""" """get ensemble weights"""
ensemble_train_datas = self.get_ensemble_train_datas(list_sample_x) ensemble_train_datas = self.get_ensemble_train_datas(list_sample_x, predictions)
self._ensemble_model.fit(ensemble_train_datas, list_sample_y) self._ensemble_model.fit(ensemble_train_datas, list_sample_y)
orig_weight = self._ensemble_model.coef_ orig_weight = self._ensemble_model.coef_
orig_weight -= np.max(orig_weight) orig_weight -= np.max(orig_weight)
...@@ -129,10 +146,10 @@ class WeightedEnsembleFeatureSelector: ...@@ -129,10 +146,10 @@ class WeightedEnsembleFeatureSelector:
def get_ensemble_feature_importance(self, list_sample_x, list_sample_y, labels): def get_ensemble_feature_importance(self, list_sample_x, list_sample_y, labels):
"""Make sure the input list_sample_x is preprocessed with StandardScaler""" """Make sure the input list_sample_x is preprocessed with StandardScaler"""
index = list(range(len(labels))) index = list(range(len(labels)))
native_feature_importances = self.get_native_feature_importances_parallel( native_feature_importances, predictions = self.get_native_feature_importances_parallel(
list_sample_x, list_sample_y, labels, index) list_sample_x, list_sample_y, labels, index)
LOGGER.info('Get feature importances for each model: %s', native_feature_importances) LOGGER.info('Get feature importances for each model: %s', native_feature_importances)
ensemble_weights = self.get_ensemble_weights(list_sample_x, list_sample_y) ensemble_weights = self.get_ensemble_weights(list_sample_x, list_sample_y, predictions)
LOGGER.info('Get ensemble weights for each model: %s', ensemble_weights) LOGGER.info('Get ensemble weights for each model: %s', ensemble_weights)
ensemble_scores = [0 for i in range(len(list_sample_x[0]))] ensemble_scores = [0 for i in range(len(list_sample_x[0]))]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册