提交 30dc5289 编写于 作者: J jrzaurin

directory renamed to utils. Also renamed variables etc, for consistency with the rest of the code

上级 046b3b53
import numpy as np
import pandas as pd
import pickle
import cv2
import os
from pathlib import Path
from sklearn.utils import Bunch
from sklearn.model_selection import train_test_split
from .wide_utils import prepare_wide
from .deep_utils import prepare_deep
from .image_utils import prepare_image
from .text_utils import prepare_text
from ..wdtypes import *
pd.options.mode.chained_assignment = None
def prepare_data(df:pd.DataFrame, target:str, wide_cols:List[str],
crossed_cols:List[Tuple[str,str]], cat_embed_cols:List[Union[str,
Tuple[str,int]]], continuous_cols:List[str],
already_dummies:Optional[List[str]]=None,
already_standard:Optional[List[str]]=None, scale:bool=True,
default_embed_dim:int=8, padded_sequences:Optional[np.ndarray]=None,
vocab:Optional[Any]=None, word_embed_matrix:Optional[np.ndarray]=None,
text_col:Optional[str]=None, max_vocab:int=30000, min_freq:int=5,
maxlen:int=80, word_vectors_path:Optional[PosixPath]=None,
img_col:Optional[str]=None, img_path:Optional[PosixPath]=None,
width:int=224, height:int=224,
processed_images:Optional[np.ndarray]=None,
filepath:Optional[str]=None, seed:int=1,verbose:int=1) -> Bunch:
# Target
y = df[target].values
# Wide
X_wide = prepare_wide(df, wide_cols, crossed_cols, already_dummies)
# Deep Dense Layers
X_deep, cat_embed_input, cat_embed_encoding_dict, deep_column_idx = \
prepare_deep(df, cat_embed_cols, continuous_cols, already_standard,
scale, default_embed_dim)
# sklearn's Bunch as Container for the dataset
wd_dataset = Bunch(target=y, wide=X_wide.astype('float32'),
deepdense=X_deep, cat_embed_input=cat_embed_input,
cat_embed_encoding_dict = cat_embed_encoding_dict,
continuous_cols = continuous_cols,
deep_column_idx=deep_column_idx)
# Deep Text
if padded_sequences is not None:
assert vocab is not None, 'A vocabulary object is missing'
wd_dataset.deeptext, wd_dataset.vocab = padded_sequences, vocab
if word_embed_matrix is not None:
wd_dataset.word_embed_matrix = word_embed_matrix
elif text_col:
X_text, word_embed_matrix, vocab = \
prepare_text(df, text_col, max_vocab, min_freq, maxlen, word_vectors_path, verbose)
wd_dataset.deeptext, wd_dataset.vocab = X_text, vocab
if word_embed_matrix is not None:
wd_dataset.word_embed_matrix = word_embed_matrix
# Deep Image
if processed_images is not None:
X_images = processed_images
elif img_col:
X_images = prepare_image(df, img_col, img_path, width, height, verbose)
mean_R, mean_G, mean_B = [], [], []
std_R, std_G, std_B = [], [], []
try:
for img in X_images:
(mean_b, mean_g, mean_r), (std_b, std_g, std_r) = cv2.meanStdDev(img)
mean_R.append(mean_r), mean_G.append(mean_g), mean_B.append(mean_b)
std_R.append(std_r), std_G.append(std_g), std_B.append(std_b)
normalise_metrics = dict(
mean = {"R": np.mean(mean_R)/255., "G": np.mean(mean_G)/255., "B": np.mean(mean_B)/255.},
std = {"R": np.mean(std_R)/255., "G": np.mean(std_G)/255., "B": np.mean(std_B)/255.}
)
wd_dataset.deepimage, wd_dataset.normalise_metrics = X_images, normalise_metrics
except NameError:
pass
if filepath is not None:
assert not os.path.isdir(filepath), "filepath is a directory. Please provide full path including filename"
file_dir, file_name = filepath.split("/")[:-1], filepath.split("/")[-1]
if len(file_dir)==0:
pickle.dump(wd_dataset, open(filepath, 'wb'))
elif not os.path.exists(file_dir[0]):
os.makedirs(file_dir)
pickle.dump(wd_dataset, open(filepath, 'wb'))
else:
pickle.dump(wd_dataset, open(filepath, 'wb'))
if verbose: print('Wide and Deep data preparation completed.')
return wd_dataset
\ No newline at end of file
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from ..wdtypes import *
pd.options.mode.chained_assignment = None
def prepare_deep(df:pd.DataFrame, embed_cols:List[Union[str, Tuple[str,int]]],
continuous_cols:List[str], already_standard:Optional[List[str]]=None, scale:bool=True,
default_embed_dim:int=8):
if isinstance(embed_cols[0], Tuple):
embed_dim = dict(embed_cols)
embed_coln = [emb[0] for emb in embed_cols]
else:
embed_dim = {e:default_embed_dim for e in embed_cols}
embed_coln = embed_cols
deep_cols = embed_coln + continuous_cols
df_deep = df.copy()[deep_cols]
df_deep, encoding_dict = label_encode(df_deep, cols=embed_coln)
embeddings_input = []
for k,v in encoding_dict.items():
embeddings_input.append((k, len(v), embed_dim[k]))
deep_column_idx = {k:v for v,k in enumerate(df_deep.columns)}
if scale:
scaler = StandardScaler()
if already_standard is not None:
standardize_cols = [c for c in continuous_cols if c not in already_standard]
else: standardize_cols = continuous_cols
for cc in standardize_cols:
df_deep[cc] = scaler.fit_transform(df_deep[cc].values.reshape(-1,1).astype(float))
return df_deep.values, embeddings_input, encoding_dict, deep_column_idx
def label_encode(df_inp:pd.DataFrame, cols:Optional[List[str]]=None,
val_to_idx:Optional[Dict[str,Dict[str,int]]]=None):
df = df_inp.copy()
if cols == None:
cols = list(df.select_dtypes(include=['object']).columns)
if not val_to_idx:
val_types = dict()
for c in cols:
val_types[c] = df[c].unique()
val_to_idx = dict()
for k, v in val_types.items():
val_to_idx[k] = {o: i for i, o in enumerate(val_types[k])}
for k, v in val_to_idx.items():
df[k] = df[k].apply(lambda x: v[x])
return df, val_to_idx
import numpy as np
import pandas as pd
import warnings
import imutils
import cv2
from typing import List
from os import listdir
from tqdm import tqdm
from ..wdtypes import *
import pdb
def prepare_image(df:pd.DataFrame, img_col:str, img_path:str, width:int,
height:int, verbose:int=1)->np.ndarray:
image_list = df[img_col].tolist()
if verbose: print('Reading Images from {}'.format(img_path))
imgs = [cv2.imread("/".join([img_path,img])) for img in image_list]
# finding images with different height and width
aspect = [(im.shape[0], im.shape[1]) for im in imgs]
aspect_r = [a[0]/a[1] for a in aspect]
diff_idx = [i for i,r in enumerate(aspect_r) if r!=1.]
if verbose: print('Resizing')
aap = AspectAwarePreprocessor(width, height)
spp = SimplePreprocessor(width, height)
resized_imgs = []
for i,img in tqdm(enumerate(imgs), total=len(imgs), disable=verbose != 1):
if i in diff_idx:
resized_imgs.append(aap.preprocess(img))
else:
resized_imgs.append(spp.preprocess(img))
return np.asarray(resized_imgs)
# AspectAwarePreprocessor and SimplePreprocessor are directly taked from the
# great series of Books "Deep Learning for Computer Vision" by Adrian
# (https://www.pyimagesearch.com/author/adrian/). Check here
# https://www.pyimagesearch.com/
class AspectAwarePreprocessor:
def __init__(self, width:int, height:int, inter=cv2.INTER_AREA):
self.width = width
self.height = height
self.inter = inter
def preprocess(self, image:np.ndarray)->np.ndarray:
(h, w) = image.shape[:2]
dW = 0
dH = 0
if w < h:
image = imutils.resize(image, width=self.width,
inter=self.inter)
dH = int((image.shape[0] - self.height) / 2.0)
else:
image = imutils.resize(image, height=self.height,
inter=self.inter)
dW = int((image.shape[1] - self.width) / 2.0)
(h, w) = image.shape[:2]
image = image[dH:h - dH, dW:w - dW]
return cv2.resize(image, (self.width, self.height),
interpolation=self.inter)
class SimplePreprocessor:
def __init__(self, width:int, height:int, inter=cv2.INTER_AREA):
self.width = width
self.height = height
self.inter = inter
def preprocess(self, image:np.ndarray)->np.ndarray:
return cv2.resize(image, (self.width, self.height),
interpolation=self.inter)
import numpy as np
import pandas as pd
import html
import re
from pathlib import PosixPath
from typing import List
from gensim.utils import tokenize
from fastai.text import Tokenizer
from fastai.text.transform import Vocab
from ..wdtypes import *
def prepare_text(df:pd.DataFrame, text_col:str, max_vocab:int, min_freq:int, maxlen:int,
word_vectors_path:Optional[str]=None, verbose:int=1):
texts = df[text_col].tolist()
tokens = get_texts(texts)
vocab = Vocab.create(tokens, max_vocab=max_vocab, min_freq=min_freq)
sequences = [vocab.numericalize(t) for t in tokens]
padded_seq = np.array([pad_sequences(s, maxlen=maxlen) for s in sequences])
if verbose:
print("The vocabulary contains {} words".format(len(vocab.stoi)))
if word_vectors_path is not None:
embedding_matrix = build_embeddings_matrix(vocab, word_vectors_path)
else:
embedding_matrix = None
return padded_seq, embedding_matrix, vocab
def simple_preprocess(doc:str, lower:bool=False, deacc:bool=False, min_len:int=2,
max_len:int=15) -> List[str]:
tokens = [
token for token in tokenize(doc, lower=False, deacc=deacc, errors='ignore')
if min_len <= len(token) <= max_len and not token.startswith('_')
]
return tokens
def get_texts(texts:List[str]) -> List[List[str]]:
processed_textx = [' '.join(simple_preprocess(t)) for t in texts]
tok = Tokenizer().process_all(processed_textx)
return tok
def pad_sequences(seq:List[int], maxlen:int=190, pad_first:bool=True, pad_idx:int=1) -> List[List[int]]:
if len(seq) >= maxlen:
res = np.array(seq[-maxlen:]).astype('int32')
return res
else:
res = np.zeros(maxlen, dtype='int32') + pad_idx
if pad_first: res[-len(seq):] = seq
else: res[:len(seq):] = seq
return res
def build_embeddings_matrix(vocab:Vocab, word_vectors_path:PosixPath, verbose:int=1) -> np.ndarray:
if verbose: print('Indexing word vectors...')
embeddings_index = {}
f = open(str(word_vectors_path))
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:], dtype='float32')
embeddings_index[word] = coefs
f.close()
if verbose:
print('Loaded {} word vectors'.format(len(embeddings_index)))
print('Preparing embeddings matrix...')
mean_word_vector = np.mean(list(embeddings_index.values()), axis=0)
embedding_dim = len(list(embeddings_index.values())[0])
num_words = len(vocab.itos)
embedding_matrix = np.zeros((num_words, embedding_dim))
found_words=0
for i,word in enumerate(vocab.itos):
embedding_vector = embeddings_index.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
found_words+=1
else:
embedding_matrix[i] = mean_word_vector
if verbose:
print('{} words in the vocabulary had {} vectors and appear more than the min frequency'.format(found_words, word_vectors_path))
return embedding_matrix
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from ..wdtypes import *
def prepare_wide(df:pd.DataFrame, wide_cols:List[str], crossed_cols:List[Tuple[str,str]],
already_dummies:Optional[List[str]]=None) -> np.ndarray:
df_wide = df.copy()[wide_cols]
crossed_columns = []
for cols in crossed_cols:
colname = '_'.join(cols)
df_wide[colname] = df_wide[cols].apply(lambda x: '-'.join(x), axis=1)
crossed_columns.append(colname)
if already_dummies:
dummy_cols = [c for c in wide_cols+crossed_columns if c not in already_dummies]
else:
dummy_cols = wide_cols+crossed_columns
df_wide = pd.get_dummies(df_wide, columns=dummy_cols)
return df_wide.values
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册