import warnings
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from pytorch_widedeep.wdtypes import (
Dict,
List,
Tuple,
Union,
Literal,
Optional,
)
from pytorch_widedeep.utils.general_utils import Alias
from pytorch_widedeep.utils.deeptabular_utils import LabelEncoder
from pytorch_widedeep.preprocessing.base_preprocessor import (
BasePreprocessor,
check_is_fitted,
)
def embed_sz_rule(
n_cat: int,
embedding_rule: Literal["google", "fastai_old", "fastai_new"] = "fastai_new",
) -> int:
r"""Rule of thumb to pick embedding size corresponding to ``n_cat``. Default rule is taken
from recent fastai's Tabular API. The function also includes previously used rule by fastai
and rule included in the Google's Tensorflow documentation
Parameters
----------
n_cat: int
number of unique categorical values in a feature
embedding_rule: str, default = fastai_old
rule of thumb to be used for embedding vector size
"""
if embedding_rule == "google":
return int(round(n_cat**0.25))
elif embedding_rule == "fastai_old":
return int(min(50, (n_cat // 2) + 1))
else:
return int(min(600, round(1.6 * n_cat**0.56)))
class Quantizer:
"""Helper class to perform the quantization of continuous columns. It is
included in this docs for completion, since depending on the value of the
parameter `'quantization_setup'` of the `TabPreprocessor` class, that
class might have an attribute of type `Quantizer`. However, this class is
designed to always run internally within the `TabPreprocessor` class.
Parameters
----------
quantization_setup: Dict, default = None
Dictionary where the keys are the column names to quantize and the
values are the either integers indicating the number of bins or a
list of scalars indicating the bin edges.
"""
def __init__(
self,
quantization_setup: Dict[str, Union[int, List[float]]],
**kwargs,
):
self.quantization_setup = quantization_setup
self.quant_args = kwargs
self.is_fitted = False
def fit(self, df: pd.DataFrame) -> "Quantizer":
self.bins: Dict[str, List[float]] = {}
for col, bins in self.quantization_setup.items():
_, self.bins[col] = pd.cut(
df[col], bins, retbins=True, labels=False, **self.quant_args
)
self.inversed_bins: Dict[str, Dict[int, float]] = {}
for col, bins in self.bins.items():
self.inversed_bins[col] = {
k: v
for k, v in list(
zip(
range(len(bins)),
[(a + b) / 2.0 for a, b in zip(bins, bins[1:])],
)
)
}
self.is_fitted = True
return self
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
check_is_fitted(self, condition=self.is_fitted)
dfc = df.copy()
for col, bins in self.bins.items():
dfc[col] = pd.cut(dfc[col], bins, labels=False, **self.quant_args)
return dfc
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
return self.fit(df).transform(df)
class TabPreprocessor(BasePreprocessor):
r"""Preprocessor to prepare the `deeptabular` component input dataset
Parameters
----------
cat_embed_cols: List, default = None
List containing the name of the categorical columns that will be
represented by embeddings (e.g. _['education', 'relationship', ...]_) or
a Tuple with the name and the embedding dimension (e.g.: _[
('education',32), ('relationship',16), ...]_)
continuous_cols: List, default = None
List with the name of the continuous cols
quantization_setup: int or Dict, default = None
Continuous columns can be turned into categorical via `pd.cut`. If
`quantization_setup` is an `int`, all continuous columns will be
quantized using this value as the number of bins. Alternatively, a
dictionary where the keys are the column names to quantize and the
values are the either integers indicating the number of bins or a
list of scalars indicating the bin edges.
cols_to_scale: List, default = None,
List with the names of the columns that will be standarised via
sklearn's `StandardScaler`
scale: bool, default = False
:information_source: **note**: this arg will be removed in the next
release. Please use `cols_to_scale` instead.
Bool indicating whether or not to scale/standarise continuous cols. It
is important to emphasize that all the DL models for tabular data in
the library also include the possibility of normalising the input
continuous features via a `BatchNorm` or a `LayerNorm`.
Param alias: `scale_cont_cols`.
already_standard: List, default = None
:information_source: **note**: this arg will be removed in the next
release. Please use `cols_to_scale` instead.
List with the name of the continuous cols that do not need to be
scaled/standarised.
auto_embed_dim: bool, default = True
Boolean indicating whether the embedding dimensions will be
automatically defined via rule of thumb. See `embedding_rule`
below.
embedding_rule: str, default = 'fastai_new'
If `auto_embed_dim=True`, this is the choice of embedding rule of
thumb. Choices are:
- _fastai_new_: $min(600, round(1.6 \times n_{cat}^{0.56}))$
- _fastai_old_: $min(50, (n_{cat}//{2})+1)$
- _google_: $min(600, round(n_{cat}^{0.24}))$
default_embed_dim: int, default=16
Dimension for the embeddings if the embed_dim is not provided in the
`cat_embed_cols` parameter and `auto_embed_dim` is set to
`False`.
with_attention: bool, default = False
Boolean indicating whether the preprocessed data will be passed to an
attention-based model (more precisely a model where all embeddings
must have the same dimensions). If `True`, the param `cat_embed_cols`
must just be a list containing just the categorical column names:
e.g.
_['education', 'relationship', ...]_. This is because they will all be
encoded using embeddings of the same dim, which will be specified
later when the model is defined.
Param alias:
`for_transformer`
with_cls_token: bool, default = False
Boolean indicating if a `'[CLS]'` token will be added to the dataset
when using attention-based models. The final hidden state
corresponding to this token is used as the aggregated representation
for classification and regression tasks. If not, the categorical
(and continuous embeddings if present) will be concatenated before
being passed to the final MLP (if present).
shared_embed: bool, default = False
Boolean indicating if the embeddings will be "shared" when using
attention-based models. The idea behind `shared_embed` is
described in the Appendix A in the [TabTransformer paper](https://arxiv.org/abs/2012.06678):
_'The goal of having column embedding is to enable the model to
distinguish the classes in one column from those in the other
columns'_. In other words, the idea is to let the model learn which
column is embedded at the time. See: `pytorch_widedeep.models.transformers._layers.SharedEmbeddings`.
verbose: int, default = 1
Other Parameters
----------------
**kwargs: dict
`pd.cut` and `StandardScaler` related args
Attributes
----------
embed_dim: Dict
Dictionary where keys are the embed cols and values are the embedding
dimensions. If `with_attention` is set to `True` this attribute
is not generated during the `fit` process
label_encoder: LabelEncoder
see `pytorch_widedeep.utils.dense_utils.LabelEncder`
cat_embed_input: List
List of Tuples with the column name, number of individual values for
that column and, If `with_attention` is set to `False`, the
corresponding embeddings dim, e.g. _[('education', 16, 10),
('relationship', 6, 8), ...]_.
standardize_cols: List
List of the columns that will be standarized
scaler: StandardScaler
an instance of `sklearn.preprocessing.StandardScaler`
column_idx: Dict
Dictionary where keys are column names and values are column indexes.
This is neccesary to slice tensors
quantizer: Quantizer
an instance of `Quantizer`
Examples
--------
>>> import pandas as pd
>>> import numpy as np
>>> from pytorch_widedeep.preprocessing import TabPreprocessor
>>> df = pd.DataFrame({'color': ['r', 'b', 'g'], 'size': ['s', 'n', 'l'], 'age': [25, 40, 55]})
>>> cat_embed_cols = [('color',5), ('size',5)]
>>> cont_cols = ['age']
>>> deep_preprocessor = TabPreprocessor(cat_embed_cols=cat_embed_cols, continuous_cols=cont_cols)
>>> X_tab = deep_preprocessor.fit_transform(df)
>>> deep_preprocessor.embed_dim
{'color': 5, 'size': 5}
>>> deep_preprocessor.column_idx
{'color': 0, 'size': 1, 'age': 2}
>>> cont_df = pd.DataFrame({"col1": np.random.rand(10), "col2": np.random.rand(10) + 1})
>>> cont_cols = ["col1", "col2"]
>>> tab_preprocessor = TabPreprocessor(continuous_cols=cont_cols, quantization_setup=3)
>>> ft_cont_df = tab_preprocessor.fit_transform(cont_df)
>>> # or...
>>> quantization_setup = {'col1': [0., 0.4, 1.], 'col2': [1., 1.4, 2.]}
>>> tab_preprocessor2 = TabPreprocessor(continuous_cols=cont_cols, quantization_setup=quantization_setup)
>>> ft_cont_df2 = tab_preprocessor2.fit_transform(cont_df)
"""
@Alias("with_attention", "for_transformer")
@Alias("cat_embed_cols", "embed_cols")
@Alias("scale", "scale_cont_cols")
def __init__(
self,
cat_embed_cols: Optional[Union[List[str], List[Tuple[str, int]]]] = None,
continuous_cols: Optional[List[str]] = None,
quantization_setup: Optional[
Union[int, Dict[str, Union[int, List[float]]]]
] = None,
cols_to_scale: Optional[List[str]] = None,
auto_embed_dim: bool = True,
embedding_rule: Literal["google", "fastai_old", "fastai_new"] = "fastai_new",
default_embed_dim: int = 16,
with_attention: bool = False,
with_cls_token: bool = False,
shared_embed: bool = False,
verbose: int = 1,
*,
scale: bool = False,
already_standard: List[str] = None,
**kwargs,
):
super(TabPreprocessor, self).__init__()
self.continuous_cols = continuous_cols
self.quantization_setup = quantization_setup
self.cols_to_scale = cols_to_scale
self.scale = scale
self.already_standard = already_standard
self.auto_embed_dim = auto_embed_dim
self.embedding_rule = embedding_rule
self.default_embed_dim = default_embed_dim
self.with_attention = with_attention
self.with_cls_token = with_cls_token
self.shared_embed = shared_embed
self.verbose = verbose
self.quant_args = {
k: v for k, v in kwargs.items() if k in pd.cut.__code__.co_varnames
}
self.scale_args = {
k: v for k, v in kwargs.items() if k in StandardScaler().get_params()
}
self._check_inputs(cat_embed_cols)
if with_cls_token:
self.cat_embed_cols = (
["cls_token"] + cat_embed_cols # type: ignore[operator]
if cat_embed_cols is not None
else ["cls_token"]
)
else:
self.cat_embed_cols = cat_embed_cols # type: ignore[assignment]
self.is_fitted = False
def fit(self, df: pd.DataFrame) -> BasePreprocessor:
"""Fits the Preprocessor and creates required attributes
Parameters
----------
df: pd.DataFrame
Input pandas dataframe
Returns
-------
TabPreprocessor
`TabPreprocessor` fitted object
"""
df_adj = self._insert_cls_token(df) if self.with_cls_token else df.copy()
if self.cat_embed_cols is not None:
df_emb = self._prepare_embed(df_adj)
self.label_encoder = LabelEncoder(
columns_to_encode=df_emb.columns.tolist(),
shared_embed=self.shared_embed,
with_attention=self.with_attention,
)
self.label_encoder.fit(df_emb)
self.cat_embed_input: List = []
for k, v in self.label_encoder.encoding_dict.items():
if self.with_attention:
self.cat_embed_input.append((k, len(v)))
else:
self.cat_embed_input.append((k, len(v), self.embed_dim[k]))
if self.continuous_cols is not None:
df_cont = self._prepare_continuous(df_adj)
if self.standardize_cols is not None:
self.scaler = StandardScaler(**self.scale_args).fit(
df_cont[self.standardize_cols].values
)
elif self.verbose:
warnings.warn("Continuous columns will not be normalised")
if self.cols_and_bins is not None:
# we do not run 'Quantizer.fit' here since in the wild case
# someone wants standardization and quantization for the same
# columns, the Quantizer will run on the scaled data
self.quantizer = Quantizer(self.cols_and_bins, **self.quant_args)
self.is_fitted = True
return self
def transform(self, df: pd.DataFrame) -> np.ndarray:
"""Returns the processed `dataframe` as a np.ndarray
Parameters
----------
df: pd.DataFrame
Input pandas dataframe
Returns
-------
np.ndarray
transformed input dataframe
"""
check_is_fitted(self, condition=self.is_fitted)
df_adj = self._insert_cls_token(df) if self.with_cls_token else df.copy()
if self.cat_embed_cols is not None:
df_emb = self._prepare_embed(df_adj)
df_emb = self.label_encoder.transform(df_emb)
if self.continuous_cols is not None:
df_cont = self._prepare_continuous(df_adj)
if self.standardize_cols:
df_cont[self.standardize_cols] = self.scaler.transform(
df_cont[self.standardize_cols].values
)
if self.cols_and_bins is not None:
df_cont = self.quantizer.fit_transform(df_cont)
try:
df_deep = pd.concat([df_emb, df_cont], axis=1)
except NameError:
try:
df_deep = df_emb.copy()
except NameError:
df_deep = df_cont.copy()
self.column_idx = {k: v for v, k in enumerate(df_deep.columns)}
return df_deep.values
def inverse_transform(self, encoded: np.ndarray) -> pd.DataFrame:
r"""Takes as input the output from the `transform` method and it will
return the original values.
Parameters
----------
encoded: np.ndarray
array with the output of the `transform` method
Returns
-------
pd.DataFrame
Pandas dataframe with the original values
"""
decoded = pd.DataFrame(encoded, columns=self.column_idx.keys())
# embeddings back to original category
if self.cat_embed_cols is not None:
if isinstance(self.cat_embed_cols[0], tuple):
emb_c: List = [c[0] for c in self.cat_embed_cols]
else:
emb_c = self.cat_embed_cols.copy()
for c in emb_c:
decoded[c] = decoded[c].map(self.label_encoder.inverse_encoding_dict[c])
# quantized cols to the mid point
if self.quantization_setup is not None:
if self.verbose:
print(
"Note that quantized cols will not be turned into the mid point of "
"the corresponding bin"
)
for k, v in self.quantizer.inversed_bins.items():
decoded[k] = decoded[k].map(v)
# continuous_cols back to non-standarised
try:
decoded[self.continuous_cols] = self.scaler.inverse_transform(
decoded[self.continuous_cols]
)
except AttributeError:
pass
if "cls_token" in decoded.columns:
decoded.drop("cls_token", axis=1, inplace=True)
return decoded
def fit_transform(self, df: pd.DataFrame) -> np.ndarray:
"""Combines `fit` and `transform`
Parameters
----------
df: pd.DataFrame
Input pandas dataframe
Returns
-------
np.ndarray
transformed input dataframe
"""
return self.fit(df).transform(df)
def _insert_cls_token(self, df: pd.DataFrame) -> pd.DataFrame:
df_cls = df.copy()
df_cls.insert(loc=0, column="cls_token", value="[CLS]")
return df_cls
def _prepare_embed(self, df: pd.DataFrame) -> pd.DataFrame:
if self.with_attention:
return df[self.cat_embed_cols]
else:
if isinstance(self.cat_embed_cols[0], tuple):
self.embed_dim: Dict = dict(self.cat_embed_cols) # type: ignore
embed_colname = [emb[0] for emb in self.cat_embed_cols]
elif self.auto_embed_dim:
n_cats = {col: df[col].nunique() for col in self.cat_embed_cols}
self.embed_dim = {
# type: ignore[misc]
col: embed_sz_rule(n_cat, self.embedding_rule)
for col, n_cat in n_cats.items()
}
embed_colname = self.cat_embed_cols # type: ignore
else:
self.embed_dim = {
e: self.default_embed_dim for e in self.cat_embed_cols
} # type: ignore
embed_colname = self.cat_embed_cols # type: ignore
return df[embed_colname]
def _prepare_continuous(self, df: pd.DataFrame) -> pd.DataFrame:
if self.is_fitted:
return df[self.continuous_cols]
else:
if self.cols_to_scale is not None:
self.standardize_cols = (
self.cols_to_scale
if self.cols_to_scale != "all"
else self.continuous_cols
)
elif self.scale:
if self.already_standard is not None:
self.standardize_cols = [
c
for c in self.continuous_cols
if c not in self.already_standard
]
else:
self.standardize_cols = self.continuous_cols
else:
self.standardize_cols = None
if self.quantization_setup is not None:
if isinstance(self.quantization_setup, int):
self.cols_and_bins: Dict[str, Union[int, List[float]]] = {}
for col in self.continuous_cols:
self.cols_and_bins[col] = self.quantization_setup
else:
self.cols_and_bins = self.quantization_setup.copy()
else:
self.cols_and_bins = None
return df[self.continuous_cols]
def _check_inputs(self, cat_embed_cols): # noqa: C901
if self.scale or self.already_standard is not None:
warnings.warn(
"'scale' and 'already_standard' will be deprecated in the next release. "
"Please use 'cols_to_scale' instead",
DeprecationWarning,
stacklevel=2,
)
if self.scale:
if self.already_standard is not None:
standardize_cols = [
c for c in self.continuous_cols if c not in self.already_standard
]
else:
standardize_cols = self.continuous_cols
elif self.cols_to_scale is not None:
standardize_cols = self.cols_to_scale
else:
standardize_cols = None
if standardize_cols is not None:
if isinstance(self.quantization_setup, int):
cols_to_quantize_and_standardize = [
c for c in standardize_cols if c in self.continuous_cols
]
elif isinstance(self.quantization_setup, dict):
cols_to_quantize_and_standardize = [
c for c in standardize_cols if c in self.quantization_setup
]
else:
cols_to_quantize_and_standardize = None
if cols_to_quantize_and_standardize is not None:
warnings.warn(
f"the following columns: {cols_to_quantize_and_standardize} will be first scaled"
" using a StandardScaler and then quantized. Make sure this is what you really want"
)
if self.with_cls_token and not self.with_attention:
warnings.warn(
"If 'with_cls_token' is set to 'True', 'with_attention' will be automatically ",
"to 'True' if is 'False'",
)
self.with_attention = True
if (cat_embed_cols is None) and (self.continuous_cols is None):
raise ValueError(
"'cat_embed_cols' and 'continuous_cols' are 'None'. Please, define at least one of the two."
)
if (
cat_embed_cols is not None
and self.continuous_cols is not None
and len(np.intersect1d(cat_embed_cols, self.continuous_cols)) > 0
):
overlapping_cols = list(
np.intersect1d(cat_embed_cols, self.continuous_cols)
)
raise ValueError(
"Currently passing columns as both categorical and continuum is not supported."
" Please, choose one or the other for the following columns: {}".format(
", ".join(overlapping_cols)
)
)
transformer_error_message = (
"If with_attention is 'True' cat_embed_cols must be a list "
" of strings with the columns to be encoded as embeddings."
)
if (
self.with_attention
and cat_embed_cols is not None
and isinstance(cat_embed_cols[0], tuple)
):
raise ValueError(transformer_error_message)