diff --git a/example/FL/preprocessing/hfl_simpleimputer_numeric.json b/example/FL/preprocessing/hfl_simpleimputer_numeric.json new file mode 100644 index 0000000000000000000000000000000000000000..2d3db5e4dd0790a3de1a809230388f0ed422a684 --- /dev/null +++ b/example/FL/preprocessing/hfl_simpleimputer_numeric.json @@ -0,0 +1,69 @@ +{ + "party_info": { + "task_manager": "127.0.0.1:50050" + }, + "component_params": { + "roles": { + "server": "Alice", + "client": [ + "Bob", + "Charlie" + ] + }, + "common_params": { + "model": "FL_Preprocess", + "process": "fit_transform", + "FL_type": "H", + "task_name": "HFL_simpleimputer_numeric_fit_transform", + "task": "classification", + "selected_column": [ + "MinTemp", + "MaxTemp", + "Rainfall", + "Evaporation", + "Sunshine", + "WindGustSpeed", + "WindSpeed9am", + "WindSpeed3pm", + "Humidity9am", + "Humidity3pm", + "Pressure9am", + "Pressure3pm", + "Cloud9am", + "Cloud3pm", + "Temp9am", + "Temp3pm", + "RISK_MM" + ], + "id": "id", + "label": "y", + "preprocess_column": null, + "preprocess_module": { + "SimpleImputer_numeric": { + "column": null, + "missing_values": "np.nan", + "strategy": "mean", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + } + } + }, + "role_params": { + "Bob": { + "data_set": "preprocess_hfl_train_client1", + "preprocess_dataset_path": "data/result/Bob_train_dataset.csv", + "preprocess_module_path": "data/result/Bob_preprocess_module.pkl" + }, + "Charlie": { + "data_set": "preprocess_hfl_train_client2", + "preprocess_dataset_path": "data/result/Charlie_train_dataset.csv", + "preprocess_module_path": "data/result/Charlie_preprocess_module.pkl" + }, + "Alice": { + "data_set": "fl_fake_data" + } + } + } +} \ No newline at end of file diff --git a/example/FL/preprocessing/hfl_simpleimputer_string.json b/example/FL/preprocessing/hfl_simpleimputer_string.json new file mode 100644 index 0000000000000000000000000000000000000000..40e62d48f9978058d9b61833eadb21336814953e --- /dev/null +++ b/example/FL/preprocessing/hfl_simpleimputer_string.json @@ -0,0 +1,56 @@ +{ + "party_info": { + "task_manager": "127.0.0.1:50050" + }, + "component_params": { + "roles": { + "server": "Alice", + "client": [ + "Bob", + "Charlie" + ] + }, + "common_params": { + "model": "FL_Preprocess", + "process": "fit_transform", + "FL_type": "H", + "task_name": "HFL_simpleimputer_string_fit_transform", + "task": "classification", + "selected_column": [ + "WindGustDir", + "WindDir9am", + "WindDir3pm", + "RainToday" + ], + "id": "id", + "label": "y", + "preprocess_column": null, + "preprocess_module": { + "SimpleImputer_string": { + "column": null, + "missing_values": "np.nan", + "strategy": "most_frequent", + "fill_value": null, + "copy": true, + "add_indicator": false, + "keep_empty_features": false + } + } + }, + "role_params": { + "Bob": { + "data_set": "preprocess_hfl_train_client1", + "preprocess_dataset_path": "data/result/Bob_train_dataset.csv", + "preprocess_module_path": "data/result/Bob_preprocess_module.pkl" + }, + "Charlie": { + "data_set": "preprocess_hfl_train_client2", + "preprocess_dataset_path": "data/result/Charlie_train_dataset.csv", + "preprocess_module_path": "data/result/Charlie_preprocess_module.pkl" + }, + "Alice": { + "data_set": "fl_fake_data" + } + } + } +} \ No newline at end of file diff --git a/python/primihub/FL/preprocessing/imputer.py b/python/primihub/FL/preprocessing/imputer.py index 8be01cf824063d79d3949b6fcdbdc9cad89ee1d7..9ef3ebe65b454f4cc5aaab673f43a2a3b0b36d36 100644 --- a/python/primihub/FL/preprocessing/imputer.py +++ b/python/primihub/FL/preprocessing/imputer.py @@ -1,9 +1,12 @@ +import numbers import numpy as np from sklearn.impute import SimpleImputer as SKL_SimpleImputer +from sklearn.impute._base import _BaseImputer from .base import PreprocessBase +from .util import get_dense_mask, unique -class SimpleImputer(PreprocessBase): +class SimpleImputer(PreprocessBase, _BaseImputer): def __init__(self, missing_values=np.nan, @@ -22,7 +25,160 @@ class SimpleImputer(PreprocessBase): copy=copy, add_indicator=add_indicator, keep_empty_features=keep_empty_features) + if FL_type == 'H': + self.missing_values = missing_values + self.strategy = strategy + self.copy = copy + self.add_indicator = add_indicator - def Hfit(self, x): - pass - \ No newline at end of file + def Hfit(self, X): + if self.role == 'client': + X = self.module._validate_input(X, in_fit=True) + + # default fill_value is 0 for numerical input and "missing_value" + # otherwise + if self.module.fill_value is None: + if X.dtype.kind in ("i", "u", "f"): + fill_value = 0 + else: + fill_value = "missing_value" + else: + fill_value = self.module.fill_value + + # fill_value should be numerical in case of numerical input + if ( + self.module.strategy == "constant" + and X.dtype.kind in ("i", "u", "f") + and not isinstance(fill_value, numbers.Real) + ): + raise ValueError( + "'fill_value'={0} is invalid. Expected a " + "numerical value when imputing numerical " + "data".format(fill_value) + ) + + elif self.role == 'server': + fill_value = self.module.fill_value + + self.module.statistics_ = \ + self._dense_fit( + X, + self.module.strategy, + self.module.missing_values, + fill_value + ) + + return self + + def _dense_fit(self, X, strategy, missing_values, fill_value): + """Fit the transformer on dense data.""" + if self.role == 'client': + missing_mask = get_dense_mask(X, missing_values) + masked_X = np.ma.masked_array(X, mask=missing_mask) + + super()._fit_indicator(missing_mask) + self.module.indicator_ = self.indicator_ + + # Mean + if strategy == "mean": + if self.role == 'client': + sum_masked = np.ma.sum(masked_X, axis=0) + self.channel.send('sum_masked', sum_masked) + + n_samples = X.shape[0] - np.sum(missing_mask, axis=0) + # for backward-compatibility, reduce n_samples to an integer + # if the number of samples is the same for each feature (i.e. no + # missing values) + if np.ptp(n_samples) == 0: + n_samples = n_samples[0] + self.channel.send('n_samples', n_samples) + mean = self.channel.recv('mean') + + elif self.role == 'server': + sum_masked = self.channel.recv_all('sum_masked') + sum_masked = np.ma.sum(sum_masked, axis=0) + + n_samples = self.channel.recv_all('n_samples') + # n_samples could be np.int or np.ndarray + n_sum = 0 + for n in n_samples: + n_sum += n + if isinstance(n_sum, np.ndarray) and np.ptp(n_sum) == 0: + n_sum = n_sum[0] + + mean_masked = sum_masked / n_sum + # Avoid the warning "Warning: converting a masked element to nan." + mean = np.ma.getdata(mean_masked) + mean[np.ma.getmask(mean_masked)] = 0 if self.module.keep_empty_features else np.nan + self.channel.send_all('mean', mean) + + return mean + + # Median + elif strategy == "median": + median_masked = np.ma.median(masked_X, axis=0) + # Avoid the warning "Warning: converting a masked element to nan." + median = np.ma.getdata(median_masked) + median[np.ma.getmaskarray(median_masked)] = ( + 0 if self.module.keep_empty_features else np.nan + ) + + return median + + # Most frequent + elif strategy == "most_frequent": + if self.role == 'client': + frequency_counts = [] + # To be able access the elements by columns + X = X.transpose() + mask = missing_mask.transpose() + + for row, row_mask in zip(X[:], mask[:]): + row_mask = np.logical_not(row_mask).astype(bool) + row = row[row_mask] + + if len(row) == 0 and self.module.keep_empty_features: + frequency_counts.append(([missing_values], len(row_mask))) + else: + frequency_counts.append(unique(row, return_counts=True)) + + self.channel.send('frequency_counts', frequency_counts) + most_frequent = self.channel.recv('most_frequent') + + elif self.role == 'server': + frequency_counts = self.channel.recv_all('frequency_counts') + n_features = len(frequency_counts[0]) + most_frequent = [] + + for feature_idx in range(n_features): + feature_counts = {} + for client_fc in frequency_counts: + feature, counts = client_fc[feature_idx] + for i, key in enumerate(feature): + if key in feature_counts: + feature_counts[key] += counts[i] + else: + feature_counts[key] = counts[i] + + most_frequent_for_idx = max(feature_counts, key=feature_counts.get) + if most_frequent_for_idx == missing_values: + if self.module.keep_empty_features: + most_frequent_for_idx = 0 + else: + most_frequent_for_idx = np.nan + most_frequent.append(most_frequent_for_idx) + most_frequent = np.array(most_frequent) + + self.channel.send_all('most_frequent', most_frequent) + + return most_frequent + + # Constant + elif strategy == "constant": + if self.role == 'client': + # for constant strategy, self.statistcs_ is used to store + # fill_value in each column + return np.full(X.shape[1], fill_value, dtype=X.dtype) + elif self.role == 'server': + return fill_value + \ No newline at end of file diff --git a/python/primihub/FL/preprocessing/pipeline.py b/python/primihub/FL/preprocessing/pipeline.py index 9d53a8456708ced9e085aa335c805c793d7ad6b9..98d8a7a150028da12531ec43ae57cc7a8e0f7d89 100644 --- a/python/primihub/FL/preprocessing/pipeline.py +++ b/python/primihub/FL/preprocessing/pipeline.py @@ -308,8 +308,15 @@ def select_module(module_name, params, FL_type, role, channel=None): channel=channel ) elif "SimpleImputer" in module_name: + missing_values = params.get('missing_values') + if isinstance(missing_values, str): + missing_values = missing_values.lower() + if missing_values == 'np.nan': + missing_values = np.nan + elif missing_values == 'pd.na': + missing_values = pd.NA module = SimpleImputer( - missing_values=params.get('missing_values', np.nan), + missing_values=missing_values, strategy=params.get('strategy', 'mean'), fill_value=params.get('fill_value'), copy=params.get('copy', True), diff --git a/python/primihub/FL/preprocessing/util.py b/python/primihub/FL/preprocessing/util.py index 84895756a60c7daef1aa34a8e70c5a49a98e355c..954df90761c023c3d92e0b94b8382cd4af336078 100644 --- a/python/primihub/FL/preprocessing/util.py +++ b/python/primihub/FL/preprocessing/util.py @@ -273,4 +273,31 @@ def num_samples(x): return len(x) except TypeError as type_error: raise TypeError(message) from type_error - \ No newline at end of file + + +def get_dense_mask(X, value_to_mask): + with suppress(ImportError, AttributeError): + # We also suppress `AttributeError` because older versions of pandas do + # not have `NA`. + import pandas + + if value_to_mask is pandas.NA: + return pandas.isna(X) + + if is_scalar_nan(value_to_mask): + if X.dtype.kind == "f": + Xt = np.isnan(X) + elif X.dtype.kind in ("i", "u"): + # can't have NaNs in integer array. + Xt = np.zeros(X.shape, dtype=bool) + else: + # np.isnan does not work on object dtypes. + Xt = _object_dtype_isnan(X) + else: + Xt = X == value_to_mask + + return Xt + + +def _object_dtype_isnan(X): + return X != X