提交 38828686 编写于 作者: X xujiaqi01

Merge branch 'develop' into 'develop'

add slot reader

See merge request !53
......@@ -38,6 +38,34 @@ class Model(object):
self._namespace = "train.model"
self._platform = envs.get_platform()
def _init_slots(self):
sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader")
dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots]
dense_slots = [i.split(":")[0] for i in dense_slots]
self._dense_data_var = []
for i in range(len(dense_slots)):
l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32")
self._data_var.append(l)
self._dense_data_var.append(l)
self._sparse_data_var = []
for name in sparse_slots:
l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64")
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, "train.reader")
if dataset_class == "DataLoader":
self._init_dataloader()
def _init_dataloader(self):
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
def get_inputs(self):
return self._data_var
......
......@@ -13,6 +13,7 @@
# limitations under the License.
from __future__ import print_function
import sys
import abc
import os
......@@ -44,3 +45,58 @@ class Reader(dg.MultiSlotDataGenerator):
@abc.abstractmethod
def generate_sample(self, line):
pass
class SlotReader(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
self.sparse_slots = sparse_slots.strip().split(" ")
self.dense_slots = dense_slots.strip().split(" ")
self.dense_slots_shape = [reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) for i in self.dense_slots]
self.dense_slots = [i.split(":")[0] for i in self.dense_slots]
self.slots = self.dense_slots + self.sparse_slots
self.slot2index = {}
self.visit = {}
for i in range(len(self.slots)):
self.slot2index[self.slots[i]] = i
self.visit[self.slots[i]] = False
self.padding = padding
def generate_sample(self, l):
def reader():
line = l.strip().split(" ")
output = [(i, []) for i in self.slots]
for i in line:
slot_feasign = i.split(":")
slot = slot_feasign[0]
if slot not in self.slots:
continue
if slot in self.sparse_slots:
feasign = int(slot_feasign[1])
else:
feasign = float(slot_feasign[1])
output[self.slot2index[slot]][1].append(feasign)
self.visit[slot] = True
for i in self.visit:
slot = i
if not self.visit[slot]:
if i in self.dense_slots:
output[self.slot2index[i]][1].extend([self.padding] * self.dense_slots_shape[self.slot2index[i]])
else:
output[self.slot2index[i]][1].extend([self.padding])
else:
self.visit[slot] = False
yield output
return reader
......@@ -23,6 +23,7 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import f
from paddlerec.core.trainer import Trainer
from paddlerec.core.utils import envs
from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
class TranspileTrainer(Trainer):
......@@ -50,14 +51,22 @@ class TranspileTrainer(Trainer):
namespace = "evaluate.reader"
class_name = "EvaluateReader"
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
print("batch_size: {}".format(batch_size))
if sparse_slots is None and dense_slots is None:
reader_class = envs.get_global_env("class", None, namespace)
reader = dataloader_instance.dataloader(
reader_class, state, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class, class_name)
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader("", state, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
......@@ -93,13 +102,23 @@ class TranspileTrainer(Trainer):
train_data_path = envs.get_global_env(
"test_data_path", None, namespace)
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None:
pipe_cmd = "python {} {} {} {}".format(
reader, reader_class, state, self._config_yaml)
else:
padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, namespace, \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE")
......@@ -147,9 +166,6 @@ class TranspileTrainer(Trainer):
if not need_save(epoch_id, save_interval, False):
return
# print("save inference model is not supported now.")
# return
feed_varnames = envs.get_global_env(
"save.inference.feed_varnames", None, namespace)
fetch_varnames = envs.get_global_env(
......
......@@ -18,6 +18,7 @@ import os
from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ
from paddlerec.core.reader import SlotReader
def dataloader(readerclass, train, yaml_file):
......@@ -62,3 +63,49 @@ def dataloader(readerclass, train, yaml_file):
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def slotdataloader(readerclass, train, yaml_file):
if train == "TRAIN":
reader_name = "SlotReader"
namespace = "train.reader"
data_path = get_global_env("train_data_path", None, namespace)
else:
reader_name = "SlotReader"
namespace = "evaluate.reader"
data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
sparse = get_global_env("sparse_slots", None, namespace)
dense = get_global_env("dense_slots", None, namespace)
padding = get_global_env("padding", 0, namespace)
reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding))
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
......@@ -16,19 +16,33 @@ from __future__ import print_function
import sys
from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import envs
if len(sys.argv) != 4:
raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate 3.yaml_abs_path")
if len(sys.argv) < 4:
raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path")
reader_package = sys.argv[1]
if sys.argv[2] == "TRAIN":
if sys.argv[2].upper() == "TRAIN":
reader_name = "TrainReader"
else:
elif sys.argv[2].upper() == "EVALUATE":
reader_name = "EvaluateReader"
else:
reader_name = "SlotReader"
namespace = sys.argv[4]
sparse_slots = sys.argv[5].replace("#", " ")
dense_slots = sys.argv[6].replace("#", " ")
padding = int(sys.argv[7])
yaml_abs_path = sys.argv[3]
reader_class = lazy_instance_by_fliename(reader_package, reader_name)
reader = reader_class(yaml_abs_path)
reader.init()
reader.run_from_stdin()
if reader_name != "SlotReader":
reader_class = lazy_instance_by_fliename(reader_package, reader_name)
reader = reader_class(yaml_abs_path)
reader.init()
reader.run_from_stdin()
else:
reader = SlotReader(yaml_abs_path)
reader.init(sparse_slots, dense_slots, padding)
reader.run_from_stdin()
# PaddleRec 推荐数据集格式
当你的数据集格式为[slot:feasign]*这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。
好处是不用自己写Reader了,各个model之间的数据格式也都可以统一成一样的格式。
## 数据格式说明
假如你的原始数据格式为
```bash
<label> <integer feature 1> ... <integer feature 13> <categorical feature 1> ... <categorical feature 26>
```
其中```<label>```表示广告是否被点击,点击用1表示,未点击用0表示。```<integer feature>```代表数值特征(连续特征),共有13个连续特征。
并且每个特征有一个特征值。
```<categorical feature>```代表分类特征(离散特征),共有26个离散特征。相邻两个特征用```\t```分隔。
假设这13个连续特征(dense slot)的name如下:
```
D1 D2 D3 D4 D4 D6 D7 D8 D9 D10 D11 D12 D13
```
这26个离散特征(sparse slot)的name如下:
```
S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26
```
那么下面这条样本(1个label + 13个dense值 + 26个feasign)
```
1 0.1 0.4 0.2 0.3 0.5 0.8 0.3 0.2 0.1 0.5 0.6 0.3 0.9 60 16 91 50 52 52 28 69 63 33 87 69 48 59 27 12 95 36 37 41 17 3 86 19 88 60
```
可以转换成:
```
label:1 D1:0.1 D2:0.4 D3:0.2 D4:0.3 D5:0.5 D6:0.8 D7:0.3 D8:0.2 D9:0.1 D10:0.5 D11:0.6 D12:0.3 D13:0.9 S14:60 S15:16 S16:91 S17:50 S18:52 S19:52 S20:28 S21:69 S22:63 S23:33 S24:87 S25:69 S26:48 S27:59 S28:27 S29:12 S30:95 S31:36 S32:37 S33:41 S34:17 S35:3 S36:86 S37:19 S38:88 S39:60
```
注意:上面各个slot:feasign字段之间的顺序没有要求,比如```D1:0.1 D2:0.4```改成```D2:0.4 D1:0.1```也可以。
## 配置
reader中需要配置```sparse_slots```与```dense_slots```,例如
```
workspace: xxxx
reader:
batch_size: 2
train_data_path: "{workspace}/data/train_data"
sparse_slots: "label S1 S2 S3 S4 S5 S6 S7 S8 S9 S10 S11 S12 S13 S14 S15 S16 S17 S18 S19 S20 S21 S22 S23 S24 S25 S26"
dense_slots: "D1:1 D2:1 D3:1 D4:1 D4:1 D6:1 D7:1 D8:1 D9:1 D10:1 D11:1 D12:1 D13:1"
model:
xxxxx
```
sparse_slots表示稀疏特征的列表,以空格分开。
dense_slots表示稠密特征的列表,以空格分开。每个字段的格式是```[dense_slot_name]:[dim1,dim2,dim3...]```,其中```dim1,dim2,dim3...```表示shape
配置好了之后,这些slot对应的variable就可以在model中的如下变量啦:
```
self._sparse_data_var
self._dense_data_var
```
# PaddleRec 自定义数据集及Reader
用户自定义数据集及配置异步Reader,需要关注以下几个步骤:
......
......@@ -71,13 +71,13 @@ python text2paddle.py raw_big_train_data/ raw_big_test_data/ train_big_data test
### 训练
```
python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification -d cpu -e single
python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification
```
### 预测
```
python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification -d cpu -e single
python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification
```
## 效果对比
......@@ -88,18 +88,3 @@ python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification
| ag news dataset | TagSpace | -- | -- | -- | -- |
| -- | Classification | -- | -- | -- | -- |
## 分布式
### 模型训练性能 (样本/s)
| 数据集 | 模型 | 单机 | 同步 (4节点) | 同步 (8节点) | 同步 (16节点) | 同步 (32节点) |
| :------------------: | :--------------------: | :---------: |:---------: |:---------: |:---------: |:---------: |
| -- | TagSpace | -- | -- | -- | -- | -- |
| -- | Classification | -- | -- | -- | -- | -- |
----
| 数据集 | 模型 | 单机 | 异步 (4节点) | 异步 (8节点) | 异步 (16节点) | 异步 (32节点) |
| :------------------: | :--------------------: | :---------: |:---------: |:---------: |:---------: |:---------: |
| -- | TagSpace | -- | -- | -- | -- | -- |
| -- | Classification | -- | -- | -- | -- | -- |
......@@ -22,9 +22,10 @@ train:
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train"
train_data_path: "{workspace}/data/slot_train"
feat_dict_name: "{workspace}/data/vocab"
sparse_slots: "label C1 C2 C3 C4 C5 C6 C7 C8 C9 C10 C11 C12 C13 C14 C15 C16 C17 C18 C19 C20 C21 C22 C23 C24 C25 C26"
dense_slots: "I1:1 I2:1 I3:1 I4:1 I5:1 I6:1 I7:1 I8:1 I9:1 I10:1 I11:1 I12:1 I13:1"
model:
models: "{workspace}/model.py"
......
......@@ -11,21 +11,32 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import math
import sys
import yaml
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
import math
import os
try:
import cPickle as pickle
except ImportError:
import pickle
from collections import Counter
import os
import paddle.fluid.incubate.data_generator as dg
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
class TrainReader(Reader):
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
......@@ -48,7 +59,7 @@ class TrainReader(Reader):
self.cat_feat_idx_dict_list = [{} for _ in range(26)]
# TODO: set vocabulary dictionary
vocab_dir = envs.get_global_env("feat_dict_name", None, "train.reader")
vocab_dir = "./vocab/"
for i in range(26):
lookup_idx = 1 # remain 0 for default value
for line in open(
......@@ -87,6 +98,17 @@ class TrainReader(Reader):
def data_iter():
label_feat_list = self._process_line(line)
yield list(zip(self.label_feat_names, label_feat_list))
s = ""
for i in list(zip(self.label_feat_names, label_feat_list)):
k = i[0]
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
reader = TrainReader("../config.yaml")
reader.init()
reader.run_from_stdin()
python download.py
python preprocess.py
mkdir slot_train
for i in `ls ./train`
do
cat train/$i | python get_slot_data.py > slot_train/$i
done
mkdir slot_test_valid
for i in `ls ./test_valid`
do
cat test_valid/$i | python get_slot_data.py > slot_test_valid/$i
done
......@@ -31,6 +31,11 @@ class Model(ModelBase):
self.dnn_use_bn = envs.get_global_env("hyper_parameters.dnn_use_bn", None, self._namespace)
self.clip_by_norm = envs.get_global_env("hyper_parameters.clip_by_norm", None, self._namespace)
cat_feat_num = envs.get_global_env("hyper_parameters.cat_feat_num", None, self._namespace)
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_inputs = self._dense_data_var
self.target_input = self._sparse_data_var[0]
cat_feat_dims_dict = OrderedDict()
for line in open(cat_feat_num):
spls = line.strip().split()
......@@ -40,8 +45,8 @@ class Model(ModelBase):
)
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse", None, self._namespace)
self.dense_feat_names = ['I' + str(i) for i in range(1, 14)]
self.sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
self.dense_feat_names = [i.name for i in self.dense_inputs]
self.sparse_feat_names = [i.name for i in self.sparse_inputs]
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
......@@ -51,21 +56,17 @@ class Model(ModelBase):
self.net_input = None
self.loss = None
def _create_embedding_input(self, data_dict):
def _create_embedding_input(self):
# sparse embedding
sparse_emb_dict = OrderedDict((name, fluid.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
],
is_sparse=self.is_sparse)) for name in self.sparse_feat_names)
sparse_emb_dict = OrderedDict()
for var in self.sparse_inputs:
sparse_emb_dict[var.name] = fluid.embedding(input=var,
size=[self.feat_dims_dict[var.name] + 1,
6 * int(pow(self.feat_dims_dict[var.name], 0.25))
],is_sparse=self.is_sparse)
# combine dense and sparse_emb
dense_input_list = [
data_dict[name] for name in data_dict if name.startswith('I')
]
dense_input_list = self.dense_inputs
sparse_emb_list = list(sparse_emb_dict.values())
sparse_input = fluid.layers.concat(sparse_emb_list, axis=-1)
......@@ -111,15 +112,10 @@ class Model(ModelBase):
return fluid.layers.reduce_sum(fluid.layers.square(w))
def train_net(self):
self.model._init_slots()
self.init_network()
self.target_input = fluid.data(
name='label', shape=[None, 1], dtype='float32')
data_dict = OrderedDict()
for feat_name in self.feat_dims_dict:
data_dict[feat_name] = fluid.data(
name=feat_name, shape=[None, 1], dtype='float32')
self.net_input = self._create_embedding_input(data_dict)
self.net_input = self._create_embedding_input()
deep_out = self._deep_net(self.net_input, self.dnn_hidden_units, self.dnn_use_bn, False)
......@@ -130,9 +126,6 @@ class Model(ModelBase):
logit = fluid.layers.fc(last_out, 1)
self.prob = fluid.layers.sigmoid(logit)
self._data_var = [self.target_input] + [
data_dict[dense_name] for dense_name in self.dense_feat_names
] + [data_dict[sparse_name] for sparse_name in self.sparse_feat_names]
# auc
prob_2d = fluid.layers.concat([1 - self.prob, self.prob], 1)
......@@ -143,7 +136,7 @@ class Model(ModelBase):
self._metrics["BATCH_AUC"] = batch_auc_var
# logloss
logloss = fluid.layers.log_loss(self.prob, self.target_input)
logloss = fluid.layers.log_loss(self.prob, fluid.layers.cast(self.target_input, dtype='float32'))
self.avg_logloss = fluid.layers.reduce_mean(logloss)
# reg_coeff * l2_reg_cross
......@@ -157,4 +150,5 @@ class Model(ModelBase):
return optimizer
def infer_net(self, parameter_list):
self.model._init_slots()
self.deepfm_net()
......@@ -22,9 +22,10 @@ train:
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train_data"
feat_dict_name: "{workspace}/data/aid_data/feat_dict_10.pkl2"
train_data_path: "{workspace}/data/slot_train_data"
feat_dict_name: "{workspace}/data/feat_dict_10.pkl2"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
model:
models: "{workspace}/model.py"
......
......@@ -12,18 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import yaml
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
class TrainReader(dg.MultiSlotDataGenerator):
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
class TrainReader(Reader):
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
......@@ -37,7 +43,7 @@ class TrainReader(Reader):
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
self.feat_dict_name = envs.get_global_env("feat_dict_name", None, "train.reader")
self.feat_dict_name = "aid_data/feat_dict_10.pkl2"
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
def _process_line(self, line):
......@@ -70,6 +76,16 @@ class TrainReader(Reader):
def data_iter():
feat_idx, feat_value, label = self._process_line(line)
yield [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]
s = ""
for i in [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]:
k = i[0]
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
reader = TrainReader("../config.yaml")
reader.init()
reader.run_from_stdin()
python download_preprocess.py
mkdir slot_train_data
for i in `ls ./train_data`
do
cat train_data/$i | python get_slot_data.py > slot_train_data/$i
done
mkdir slot_test_data
for i in `ls ./test_data`
do
cat test_data/$i | python get_slot_data.py > slot_test_data/$i
done
......@@ -33,23 +33,13 @@ class Model(ModelBase):
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None, self._namespace)
raw_feat_idx = fluid.data(name='feat_idx', shape=[None, num_field],
dtype='int64') # None * num_field(defalut:39)
raw_feat_value = fluid.data(name='feat_value', shape=[None, num_field], dtype='float32') # None * num_field
self.label = fluid.data(name='label', shape=[None, 1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx, [-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
# ------------------------- set _data_var --------------------------
self._data_var.append(raw_feat_idx)
self._data_var.append(raw_feat_value)
self._data_var.append(self.label)
if self._platform != "LINUX":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
# ------------------------- first order term --------------------------
feat_idx = raw_feat_idx
feat_value = fluid.layers.reshape(raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
reg = envs.get_global_env("hyper_parameters.reg", 1e-4, self._namespace)
first_weights_re = fluid.embedding(
......@@ -134,11 +124,12 @@ class Model(ModelBase):
self.predict = fluid.layers.sigmoid(y_first_order + y_second_order + y_dnn)
def train_net(self):
self.model._init_slots()
self.deepfm_net()
# ------------------------- Cost(logloss) --------------------------
cost = fluid.layers.log_loss(input=self.predict, label=self.label)
cost = fluid.layers.log_loss(input=self.predict, label=fluid.layers.cast(self.label, "float32"))
avg_cost = fluid.layers.reduce_sum(cost)
self._cost = avg_cost
......@@ -159,4 +150,5 @@ class Model(ModelBase):
return optimizer
def infer_net(self, parameter_list):
self.model._init_slots()
self.deepfm_net()
......@@ -23,9 +23,10 @@ train:
reader:
batch_size: 2
class: "{workspace}/../criteo_reader.py"
train_data_path: "{workspace}/data/train"
train_data_path: "{workspace}/data/slot_train_data"
reader_debug_mode: False
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
model:
models: "{workspace}/model.py"
......
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
mv ./raw_data ./train_data_full
mkdir train_data && cd train_data
cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd ..
mv ./test_data ./test_data_full
mkdir test_data && cd test_data
cp ../test_data_full/part-220 ./ && cd ..
echo "Complete data download."
echo "Full Train data stored in ./train_data_full "
echo "Full Test data stored in ./test_data_full "
echo "Rapid Verification train data stored in ./train_data "
echo "Rapid Verification test data stored in ./test_data "
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_feature"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
s = "click:" + str(label[0])
for i in dense_feature:
s += " dense_feature:" + str(i)
for i in range(1, 1 + len(categorical_range_)):
s += " " + str(i) + ":" + str(sparse_feature[i-1][0])
print s.strip()
yield None
return reader
d = CriteoDataset()
d.run_from_stdin()
sh download.sh
mkdir slot_train_data_full
for i in `ls ./train_data_full`
do
cat train_data_full/$i | python get_slot_data.py > slot_train_data_full/$i
done
mkdir slot_test_data_full
for i in `ls ./test_data_full`
do
cat test_data_full/$i | python get_slot_data.py > slot_test_data_full/$i
done
mkdir slot_train_data
for i in `ls ./train_data`
do
cat train_data/$i | python get_slot_data.py > slot_train_data/$i
done
mkdir slot_test_data
for i in `ls ./test_data`
do
cat test_data/$i | python get_slot_data.py > slot_test_data/$i
done
此差异已折叠。
此差异已折叠。
......@@ -25,43 +25,9 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def input(self):
def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace)
sparse_input_ids = [
fluid.layers.data(name="S" + str(i),
shape=[1],
lod_level=1,
dtype="int64") for i in range(1, ids)
]
return sparse_input_ids
def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace)
dense_input_var = fluid.layers.data(name="D",
shape=[dim],
dtype="float32")
return dense_input_var
def label_input():
label = fluid.layers.data(name="click", shape=[1], dtype="int64")
return label
self.sparse_inputs = sparse_inputs()
self.dense_input = dense_input()
self.label_input = label_input()
self._data_var.append(self.dense_input)
for input in self.sparse_inputs:
self._data_var.append(input)
self._data_var.append(self.label_input)
if self._platform != "LINUX":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
self.sparse_inputs = self._sparse_data_var[1:]
self.dense_input = self._dense_data_var[0]
self.label_input = self._sparse_data_var[0]
def net(self):
is_distributed = True if envs.get_trainer() == "CtrTrainer" else False
......@@ -122,6 +88,7 @@ class Model(ModelBase):
self._metrics["BATCH_AUC"] = batch_auc
def train_net(self):
self.model._init_slots()
self.input()
self.net()
self.avg_loss()
......@@ -133,5 +100,6 @@ class Model(ModelBase):
return optimizer
def infer_net(self):
self.model._init_slots()
self.input()
self.net()
......@@ -59,6 +59,11 @@
## 使用教程
### 数据处理
参考每个模型目录数据下载&预处理脚本
```
sh run.sh
```
### 训练
```
python -m paddlerec.run -m paddlerec.models.rank.dnn # 以DNN为例
......
......@@ -22,8 +22,9 @@ train:
reader:
batch_size: 2
class: "{workspace}/reader.py"
train_data_path: "{workspace}/data/train_data"
train_data_path: "{workspace}/data/slot_train_data"
sparse_slots: "label"
dense_slots: "wide_input:8 deep_input:58"
model:
models: "{workspace}/model.py"
......
mkdir train_data
mkdir test_data
mkdir data
train_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/data/adult.data"
test_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/data/adult.test"
train_data_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/train_data/train_data.csv"
test_data_path="/home/yaoxuefeng/repos/models/models/PaddleRec/ctr/wide_deep/test_data/test_data.csv"
train_path="adult.data"
test_path="adult.test"
train_data_path="./train_data/train_data.csv"
test_data_path="./test_data/test_data.csv"
#pip install -r requirements.txt
pip install -r requirements.txt
#wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
#wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
......
import os
import io
import args
import pandas as pd
from sklearn import preprocessing
def _clean_file(source_path,target_path):
"""makes changes to match the CSV format."""
with io.open(source_path, 'r') as temp_eval_file:
with io.open(target_path, 'w') as eval_file:
for line in temp_eval_file:
line = line.strip()
line = line.replace(', ', ',')
if not line or ',' not in line:
continue
if line[-1] == '.':
line = line[:-1]
line += '\n'
eval_file.write(line)
def build_model_columns(train_data_path, test_data_path):
# The column names are from
# https://www2.1010data.com/documentationcenter/prod/Tutorials/MachineLearningExamples/CensusIncomeDataSet.html
column_names = [
'age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week', 'native_country',
'income_bracket'
]
# Load the dataset in Pandas
train_df = pd.read_csv(
train_data_path,
delimiter=',',
header=None,
index_col=None,
names=column_names)
test_df = pd.read_csv(
test_data_path,
delimiter=',',
header=None,
index_col=None,
names=column_names)
# First group of tasks according to the paper
#label_columns = ['income_50k', 'marital_stat']
categorical_columns = ['education','marital_status','relationship','workclass','occupation']
for col in categorical_columns:
label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col])
bins = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65]
train_df['age_buckets'] = pd.cut(train_df['age'].values.tolist(), bins,labels=False)
test_df['age_buckets'] = pd.cut(test_df['age'].values.tolist(), bins,labels=False)
base_columns = ['education', 'marital_status', 'relationship', 'workclass', 'occupation', 'age_buckets']
train_df['education_occupation'] = train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str)
test_df['education_occupation'] = test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str)
train_df['age_buckets_education_occupation'] = train_df['age_buckets'].astype(str) + '_' + train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str)
test_df['age_buckets_education_occupation'] = test_df['age_buckets'].astype(str) + '_' + test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str)
crossed_columns = ['education_occupation','age_buckets_education_occupation']
for col in crossed_columns:
label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col])
wide_columns = base_columns + crossed_columns
train_df_temp = pd.get_dummies(train_df[categorical_columns],columns=categorical_columns)
test_df_temp = pd.get_dummies(test_df[categorical_columns], columns=categorical_columns)
train_df = train_df.join(train_df_temp)
test_df = test_df.join(test_df_temp)
deep_columns = list(train_df_temp.columns)+ ['age','education_num','capital_gain','capital_loss','hours_per_week']
train_df['label'] = train_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0)
test_df['label'] = test_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0)
with io.open('train_data/columns.txt','w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n'
f.write(write_str)
f.close()
with io.open('test_data/columns.txt','w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n'
f.write(write_str)
f.close()
train_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(train_data_path,index=False)
test_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(test_data_path,index=False)
def clean_file(train_path, test_path, train_data_path, test_data_path):
_clean_file(train_path, train_data_path)
_clean_file(test_path, test_data_path)
if __name__ == '__main__':
args = args.parse_args()
clean_file(args.train_path, args.test_path, args.train_data_path, args.test_data_path)
build_model_columns(args.train_data_path, args.test_data_path)
......@@ -11,18 +11,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import yaml
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
import paddle.fluid.incubate.data_generator as dg
from paddlerec.core.reader import Reader
class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
class TrainReader(Reader):
def init(self):
pass
......@@ -41,6 +48,18 @@ class TrainReader(Reader):
def data_iter():
wide_feat, deep_deat, label = self._process_line(line)
yield [('wide_input', wide_feat), ('deep_input', deep_deat), ('label', label)]
s = ""
for i in [('wide_input', wide_feat), ('deep_input', deep_deat), ('label', label)]:
k = i[0]
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
reader = TrainReader("../config.yaml")
reader.init()
reader.run_from_stdin()
sh create_data.sh
mkdir slot_train_data
for i in `ls ./train_data`
do
cat train_data/$i | python get_slot_data.py > slot_train_data/$i
done
mkdir slot_test_data
for i in `ls ./test_data`
do
cat test_data/$i | python get_slot_data.py > slot_test_data/$i
done
......@@ -57,12 +57,10 @@ class Model(ModelBase):
return l3
def train_net(self):
wide_input = fluid.data(name='wide_input', shape=[None, 8], dtype='float32')
deep_input = fluid.data(name='deep_input', shape=[None, 58], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='float32')
self._data_var.append(wide_input)
self._data_var.append(deep_input)
self._data_var.append(label)
self.model._init_slots()
wide_input = self._dense_data_var[0]
deep_input = self._dense_data_var[1]
label = self._sparse_data_var[0]
hidden1_units = envs.get_global_env("hyper_parameters.hidden1_units", 75, self._namespace)
hidden2_units = envs.get_global_env("hyper_parameters.hidden2_units", 50, self._namespace)
......@@ -95,7 +93,7 @@ class Model(ModelBase):
self._metrics["BATCH_AUC"] = batch_auc
self._metrics["ACC"] = acc
cost = fluid.layers.sigmoid_cross_entropy_with_logits(x=prediction, label=label)
cost = fluid.layers.sigmoid_cross_entropy_with_logits(x=prediction, label=fluid.layers.cast(label, dtype='float32'))
avg_cost = fluid.layers.mean(cost)
self._cost = avg_cost
......@@ -105,4 +103,5 @@ class Model(ModelBase):
return optimizer
def infer_net(self, parameter_list):
self.model._init_slots()
self.deepfm_net()
......@@ -22,8 +22,9 @@ train:
reader:
batch_size: 2
class: "{workspace}/criteo_reader.py"
train_data_path: "{workspace}/data/train_data"
train_data_path: "{workspace}/data/slot_train_data"
sparse_slots: "label feat_idx"
dense_slots: "feat_value:39"
model:
models: "{workspace}/model.py"
......
......@@ -12,17 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import yaml
from paddlerec.core.reader import Reader
from paddlerec.core.utils import envs
try:
import cPickle as pickle
except ImportError:
import pickle
import paddle.fluid.incubate.data_generator as dg
from paddlerec.core.reader import Reader
class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
class TrainReader(Reader):
def init(self):
pass
......@@ -39,7 +46,18 @@ class TrainReader(Reader):
def generate_sample(self, line):
def data_iter():
feat_idx, feat_value, label = self._process_line(line)
yield [('feat_idx', feat_idx), ('feat_value', feat_value), ('label',
label)]
s = ""
for i in [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]:
k = i[0]
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
reader = TrainReader("../config.yaml")
reader.init()
reader.run_from_stdin()
python download.py
mkdir -p slot_train_data/tr
for i in `ls ./train_data/tr`
do
cat train_data/tr/$i | python get_slot_data.py > slot_train_data/tr/$i
done
mkdir slot_test_data/ev
for i in `ls ./test_data/ev`
do
cat test_data/ev/$i | python get_slot_data.py > slot_test_data/ev/$i
done
......@@ -34,10 +34,11 @@ class Model(ModelBase):
# ------------------------- network input --------------------------
num_field = envs.get_global_env("hyper_parameters.num_field", None, self._namespace)
raw_feat_idx = fluid.data(name='feat_idx', shape=[None, num_field], dtype='int64')
raw_feat_value = fluid.data(name='feat_value', shape=[None, num_field], dtype='float32')
self.label = fluid.data(name='label', shape=[None, 1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx, [-1, 1]) # (None * num_field) * 1
raw_feat_idx = self._sparse_data_var[1]
raw_feat_value = self._dense_data_var[0]
self.label = self._sparse_data_var[0]
feat_idx = raw_feat_idx
feat_value = fluid.layers.reshape(raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
feat_embeddings = fluid.embedding(
......@@ -52,15 +53,6 @@ class Model(ModelBase):
[-1, num_field, sparse_feature_dim]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# ------------------------- set _data_var --------------------------
self._data_var.append(raw_feat_idx)
self._data_var.append(raw_feat_value)
self._data_var.append(self.label)
if self._platform != "LINUX":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
# -------------------- linear --------------------
weights_linear = fluid.embedding(
......@@ -153,9 +145,10 @@ class Model(ModelBase):
self.predict = fluid.layers.sigmoid(y_linear + y_cin + y_dnn)
def train_net(self):
self.model._init_slots()
self.xdeepfm_net()
cost = fluid.layers.log_loss(input=self.predict, label=self.label, epsilon=0.0000001)
cost = fluid.layers.log_loss(input=self.predict, label=fluid.layers.cast(self.label, "float32"), epsilon=0.0000001)
batch_cost = fluid.layers.reduce_mean(cost)
self._cost = batch_cost
......@@ -174,4 +167,5 @@ class Model(ModelBase):
return optimizer
def infer_net(self, parameter_list):
self.model._init_slots()
self.xdeepfm_net()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册