提交 4a5b4c10 编写于 作者: Z zhoushiyu 提交者: Thunderbrook

[cherry-pick]update win apis in paddlerec/ctr (#3903)

* fix apis, including os.path, shell and multiprocess

* use mul.cpu_count in win
上级 564634b0
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import argparse import argparse
import logging import logging
import numpy as np import numpy as np
...@@ -22,12 +21,12 @@ import os ...@@ -22,12 +21,12 @@ import os
os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
logging.basicConfig( logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
num_context_feature = 22 num_context_feature = 22
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example") parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example")
parser.add_argument( parser.add_argument(
...@@ -59,6 +58,7 @@ def parse_args(): ...@@ -59,6 +58,7 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def to_lodtensor(data, place): def to_lodtensor(data, place):
seq_lens = [len(seq) for seq in data] seq_lens = [len(seq) for seq in data]
cur_len = 0 cur_len = 0
...@@ -72,7 +72,6 @@ def to_lodtensor(data, place): ...@@ -72,7 +72,6 @@ def to_lodtensor(data, place):
res.set(flattened_data, place) res.set(flattened_data, place)
res.set_lod([lod]) res.set_lod([lod])
return res return res
...@@ -91,7 +90,8 @@ def data2tensor(data, place): ...@@ -91,7 +90,8 @@ def data2tensor(data, place):
sparse_data = to_lodtensor([x[1 + i] for x in data], place) sparse_data = to_lodtensor([x[1 + i] for x in data], place)
feed_dict["context" + str(i)] = sparse_data feed_dict["context" + str(i)] = sparse_data
context_fm = to_lodtensor(np.array([x[-2] for x in data]).astype("float32"), place) context_fm = to_lodtensor(
np.array([x[-2] for x in data]).astype("float32"), place)
feed_dict["context_fm"] = context_fm feed_dict["context_fm"] = context_fm
y_data = np.array([x[-1] for x in data]).astype("int64") y_data = np.array([x[-1] for x in data]).astype("int64")
...@@ -99,6 +99,7 @@ def data2tensor(data, place): ...@@ -99,6 +99,7 @@ def data2tensor(data, place):
feed_dict["label"] = y_data feed_dict["label"] = y_data
return feed_dict return feed_dict
def test(): def test():
args = parse_args() args = parse_args()
...@@ -112,13 +113,14 @@ def test(): ...@@ -112,13 +113,14 @@ def test():
exe = fluid.Executor(place) exe = fluid.Executor(place)
whole_filelist = ["./out/normed_test_session.txt"] whole_filelist = ["./out/normed_test_session.txt"]
test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(
whole_filelist))]
epochs = 1 epochs = 1
for i in range(epochs): for i in range(epochs):
cur_model_path = args.model_path + "/epoch" + str(1) + ".model" cur_model_path = os.path.join(args.model_path,
"epoch" + str(1) + ".model")
with open("./testres/res" + str(i), 'w') as r: with open("./testres/res" + str(i), 'w') as r:
with fluid.scope_guard(test_scope): with fluid.scope_guard(test_scope):
[inference_program, feed_target_names, fetch_targets] = \ [inference_program, feed_target_names, fetch_targets] = \
...@@ -129,9 +131,11 @@ def test(): ...@@ -129,9 +131,11 @@ def test():
for batch_id, data in enumerate(test_reader()): for batch_id, data in enumerate(test_reader()):
print(len(data[0])) print(len(data[0]))
feed_dict = data2tensor(data, place) feed_dict = data2tensor(data, place)
loss_val, auc_val, accuracy, predict, _ = exe.run(inference_program, loss_val, auc_val, accuracy, predict, _ = exe.run(
feed=feed_dict, inference_program,
fetch_list=fetch_targets, return_numpy=False) feed=feed_dict,
fetch_list=fetch_targets,
return_numpy=False)
x = np.array(predict) x = np.array(predict)
for j in range(x.shape[0]): for j in range(x.shape[0]):
......
...@@ -12,8 +12,7 @@ import paddle.fluid as fluid ...@@ -12,8 +12,7 @@ import paddle.fluid as fluid
import map_reader import map_reader
from network_conf import ctr_deepfm_dataset from network_conf import ctr_deepfm_dataset
logging.basicConfig( logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -91,15 +90,20 @@ def infer(): ...@@ -91,15 +90,20 @@ def infer():
place = fluid.CPUPlace() place = fluid.CPUPlace()
inference_scope = fluid.core.Scope() inference_scope = fluid.core.Scope()
filelist = ["%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path)] filelist = [
"%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path)
]
from map_reader import MapDataset from map_reader import MapDataset
map_dataset = MapDataset() map_dataset = MapDataset()
map_dataset.setup(args.sparse_feature_dim) map_dataset.setup(args.sparse_feature_dim)
exe = fluid.Executor(place) exe = fluid.Executor(place)
whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] whole_filelist = [
"raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))
]
#whole_filelist = ["./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] #whole_filelist = ["./out/normed_train09", "./out/normed_train10", "./out/normed_train11"]
test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(
whole_filelist))]
# file_groups = [whole_filelist[i:i+train_thread_num] for i in range(0, len(whole_filelist), train_thread_num)] # file_groups = [whole_filelist[i:i+train_thread_num] for i in range(0, len(whole_filelist), train_thread_num)]
...@@ -110,7 +114,8 @@ def infer(): ...@@ -110,7 +114,8 @@ def infer():
epochs = 2 epochs = 2
for i in range(epochs): for i in range(epochs):
cur_model_path = args.model_path + "/epoch" + str(i + 1) + ".model" cur_model_path = os.path.join(args.model_path,
"epoch" + str(i + 1) + ".model")
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
[inference_program, feed_target_names, fetch_targets] = \ [inference_program, feed_target_names, fetch_targets] = \
fluid.io.load_inference_model(cur_model_path, exe) fluid.io.load_inference_model(cur_model_path, exe)
...@@ -120,9 +125,11 @@ def infer(): ...@@ -120,9 +125,11 @@ def infer():
test_reader = map_dataset.infer_reader(test_files, 1000, 100000) test_reader = map_dataset.infer_reader(test_files, 1000, 100000)
for batch_id, data in enumerate(test_reader()): for batch_id, data in enumerate(test_reader()):
loss_val, auc_val, accuracy, predict, label = exe.run(inference_program, loss_val, auc_val, accuracy, predict, label = exe.run(
feed=data2tensor(data, place), inference_program,
fetch_list=fetch_targets, return_numpy=False) feed=data2tensor(data, place),
fetch_list=fetch_targets,
return_numpy=False)
#print(np.array(predict)) #print(np.array(predict))
#x = np.array(predict) #x = np.array(predict)
......
...@@ -6,33 +6,36 @@ import paddle.fluid as fluid ...@@ -6,33 +6,36 @@ import paddle.fluid as fluid
import sys import sys
from network_confv6 import ctr_deepfm_dataset from network_confv6 import ctr_deepfm_dataset
NUM_CONTEXT_FEATURE = 22 NUM_CONTEXT_FEATURE = 22
DIM_USER_PROFILE = 10 DIM_USER_PROFILE = 10
DIM_DENSE_FEATURE = 3 DIM_DENSE_FEATURE = 3
PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours
def train(): def train():
args = parse_args() args = parse_args()
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir) os.mkdir(args.model_output_dir)
#set the input format for our model. Note that you need to carefully modify them when you define a new network #set the input format for our model. Note that you need to carefully modify them when you define a new network
#user_profile = fluid.layers.data( #user_profile = fluid.layers.data(
#name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1) #name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1)
dense_feature = fluid.layers.data( dense_feature = fluid.layers.data(
name="dense_feature", shape=[DIM_DENSE_FEATURE], dtype='float32') name="dense_feature", shape=[DIM_DENSE_FEATURE], dtype='float32')
context_feature = [ context_feature = [
fluid.layers.data(name="context" + str(i), shape=[1], lod_level=1, dtype="int64") fluid.layers.data(
for i in range(0, NUM_CONTEXT_FEATURE)] name="context" + str(i), shape=[1], lod_level=1, dtype="int64")
for i in range(0, NUM_CONTEXT_FEATURE)
]
context_feature_fm = fluid.layers.data( context_feature_fm = fluid.layers.data(
name="context_fm", shape=[1], dtype='int64', lod_level=1) name="context_fm", shape=[1], dtype='int64', lod_level=1)
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
print("ready to network") print("ready to network")
#self define network #self define network
loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset(dense_feature, context_feature, context_feature_fm, label, loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset(
args.embedding_size, args.sparse_feature_dim) dense_feature, context_feature, context_feature_fm, label,
args.embedding_size, args.sparse_feature_dim)
print("ready to optimize") print("ready to optimize")
optimizer = fluid.optimizer.SGD(learning_rate=1e-4) optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
...@@ -42,7 +45,8 @@ def train(): ...@@ -42,7 +45,8 @@ def train():
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
#use dataset api for much faster speed #use dataset api for much faster speed
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var([dense_feature] + context_feature + [context_feature_fm] + [label]) dataset.set_use_var([dense_feature] + context_feature +
[context_feature_fm] + [label])
#self define how to process generated training insatnces in map_reader.py #self define how to process generated training insatnces in map_reader.py
pipe_command = PYTHON_PATH + " map_reader.py %d" % args.sparse_feature_dim pipe_command = PYTHON_PATH + " map_reader.py %d" % args.sparse_feature_dim
dataset.set_pipe_command(pipe_command) dataset.set_pipe_command(pipe_command)
...@@ -50,27 +54,36 @@ def train(): ...@@ -50,27 +54,36 @@ def train():
thread_num = 1 thread_num = 1
dataset.set_thread(thread_num) dataset.set_thread(thread_num)
#self define how to split training files for example:"split -a 2 -d -l 200000 normed_train.txt normed_train" #self define how to split training files for example:"split -a 2 -d -l 200000 normed_train.txt normed_train"
whole_filelist = ["./out/normed_train%d" % x for x in range(len(os.listdir("out")))] whole_filelist = [
whole_filelist = ["./out/normed_train00", "./out/normed_train01", "./out/normed_train02", "./out/normed_train03", "./out/normed_train%d" % x for x in range(len(os.listdir("out")))
"./out/normed_train04", "./out/normed_train05", "./out/normed_train06", "./out/normed_train07", ]
"./out/normed_train08", whole_filelist = [
"./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] "./out/normed_train00", "./out/normed_train01", "./out/normed_train02",
"./out/normed_train03", "./out/normed_train04", "./out/normed_train05",
"./out/normed_train06", "./out/normed_train07", "./out/normed_train08",
"./out/normed_train09", "./out/normed_train10", "./out/normed_train11"
]
print("ready to epochs") print("ready to epochs")
epochs = 10 epochs = 10
for i in range(epochs): for i in range(epochs):
print("start %dth epoch" % i) print("start %dth epoch" % i)
dataset.set_filelist(whole_filelist[:int(len(whole_filelist))]) dataset.set_filelist(whole_filelist[:int(len(whole_filelist))])
#print the informations you want by setting fetch_list and fetch_info #print the informations you want by setting fetch_list and fetch_info
exe.train_from_dataset(program=fluid.default_main_program(), exe.train_from_dataset(
dataset=dataset, program=fluid.default_main_program(),
fetch_list=[auc_var, accuracy, predict, label], dataset=dataset,
fetch_info=["auc", "accuracy", "predict", "label"], fetch_list=[auc_var, accuracy, predict, label],
debug=False) fetch_info=["auc", "accuracy", "predict", "label"],
model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" debug=False)
model_dir = os.path.join(args.model_output_dir,
'/epoch' + str(i + 1) + ".model")
sys.stderr.write("epoch%d finished" % (i + 1)) sys.stderr.write("epoch%d finished" % (i + 1))
#save model #save model
fluid.io.save_inference_model(model_dir, [dense_feature.name] + [x.name for x in context_feature] + [context_feature_fm.name] + [label.name], fluid.io.save_inference_model(
[loss, auc_var, accuracy, predict, label], exe) model_dir,
[dense_feature.name] + [x.name for x in context_feature] +
[context_feature_fm.name] + [label.name],
[loss, auc_var, accuracy, predict, label], exe)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -32,7 +32,7 @@ DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions] ...@@ -32,7 +32,7 @@ DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions]
数据下载命令 数据下载命令
```bash ```bash
cd data && sh download.sh cd data && python download.py
``` ```
## 数据处理 ## 数据处理
...@@ -70,13 +70,14 @@ loss: [0.44703564] auc_val: [0.80654419] ...@@ -70,13 +70,14 @@ loss: [0.44703564] auc_val: [0.80654419]
## 多机训练 ## 多机训练
首先使用命令下载并预处理小规模样例数据集: 首先使用命令下载并预处理小规模样例数据集:
```bash ```bash
cd dist_data && sh dist_download.sh && cd .. cd dist_data && python dist_download.py && cd ..
``` ```
运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。 运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。
**注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。** **注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。**
```bash ```bash
# 该sh不支持Windows
sh cluster_train.sh sh cluster_train.sh
``` ```
参数说明: 参数说明:
...@@ -102,7 +103,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_da ...@@ -102,7 +103,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_da
- 0号trainer保存模型参数 - 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python >ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下 - 数据读取使用dataset模式,目前仅支持运行在Linux环境下
...@@ -162,7 +162,8 @@ def train(): ...@@ -162,7 +162,8 @@ def train():
fetch_info=['total_loss', 'avg_logloss', 'auc'], fetch_info=['total_loss', 'avg_logloss', 'auc'],
debug=False, debug=False,
print_period=args.print_steps) print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model if args.trainer_id == 0: # only trainer 0 save model
......
#!/usr/bin/env python
# coding: utf-8
import argparse import argparse
""" """
global params global params
......
import os
import sys
import io
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
trainfile = 'train.txt'
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url)
print("download and extract finished")
count = 0
for _ in io.open(trainfile, 'r', encoding='utf-8'):
count += 1
print("total records: %d" % count)
print("done")
#!/bin/bash
workdir=$(cd $(dirname $0); pwd)
cd $workdir
trainfile='train.txt'
echo "data dir:" ${workdir}
cd $workdir
echo "download data starting..."
wget --no-check-certificate -c https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
echo "download finished"
echo "extracting ..."
tar xzf dac.tar.gz >/dev/null 2>&1
wc -l $trainfile | awk '{print $1}' > line_nums.log
echo "extract finished"
echo "total records: "`cat line_nums.log`
echo "done"
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import os import os
...@@ -19,7 +17,6 @@ VOCAB_DIR = 'vocab' ...@@ -19,7 +17,6 @@ VOCAB_DIR = 'vocab'
TRAIN_DIR = 'train' TRAIN_DIR = 'train'
TEST_VALID_DIR = 'test_valid' TEST_VALID_DIR = 'test_valid'
SPLIT_RATIO = 0.9 SPLIT_RATIO = 0.9
LINE_NUMS = "line_nums.log"
FREQ_THR = 10 FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
...@@ -113,11 +110,13 @@ def split_data(): ...@@ -113,11 +110,13 @@ def split_data():
fout.close() fout.close()
data_dir = TEST_VALID_DIR data_dir = TEST_VALID_DIR
cur_part_idx = int(line_idx / 200000) cur_part_idx = int(line_idx / 200000)
fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
if line_idx % 200000 == 0 and line_idx != 0: if line_idx % 200000 == 0 and line_idx != 0:
fout.close() fout.close()
cur_part_idx = int(line_idx / 200000) cur_part_idx = int(line_idx / 200000)
fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
fout.write(line) fout.write(line)
fout.close() fout.close()
fin.close() fin.close()
......
from __future__ import print_function
import os
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url, savename="dist_data_demo.tar.gz")
print("download and extract finished")
print("preprocessing...")
os.system("python dist_preprocess.py")
print("preprocess done")
\ No newline at end of file
#!/bin/bash
# download small demo dataset
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz
tar xzvf dist_data_demo.tar.gz
# preprocess demo dataset
python dist_preprocess.py
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import os import os
...@@ -21,7 +19,6 @@ TEST_DIR = 'dist_test_valid_data' ...@@ -21,7 +19,6 @@ TEST_DIR = 'dist_test_valid_data'
TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr') TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr')
TEST_FILE = os.path.join(TEST_DIR, 'ev') TEST_FILE = os.path.join(TEST_DIR, 'ev')
SPLIT_RATIO = 0.9 SPLIT_RATIO = 0.9
LINE_NUMS = "line_nums.log"
FREQ_THR = 10 FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
......
#!/usr/bin/env python
# coding: utf-8
import logging import logging
import random import random
...@@ -46,7 +44,8 @@ def infer(): ...@@ -46,7 +44,8 @@ def infer():
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch cur_model_path = os.path.join(args.model_output_dir,
'epoch_' + args.test_epoch)
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program): with fluid.framework.program_guard(test_program, startup_program):
......
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import os import os
import random import random
...@@ -75,7 +73,8 @@ def train(args): ...@@ -75,7 +73,8 @@ def train(args):
fetch_info=['total_loss', 'avg_logloss', 'auc'], fetch_info=['total_loss', 'avg_logloss', 'auc'],
debug=False, debug=False,
print_period=args.print_steps) print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
fluid.io.save_persistables( fluid.io.save_persistables(
......
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import paddle.fluid as fluid import paddle.fluid as fluid
from collections import OrderedDict from collections import OrderedDict
......
#!/usr/bin/env python
# coding: utf-8
""" """
dataset and reader dataset and reader
""" """
......
...@@ -25,7 +25,7 @@ To preprocess the raw dataset, we min-max normalize continuous features to [0, 1 ...@@ -25,7 +25,7 @@ To preprocess the raw dataset, we min-max normalize continuous features to [0, 1
Download and preprocess data: Download and preprocess data:
```bash ```bash
cd data && sh download_preprocess.sh && cd .. cd data && python download_preprocess.py && cd ..
``` ```
After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2". After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2".
...@@ -58,12 +58,13 @@ We emulate distributed training on a local machine. In default, we use 2 X 2,i ...@@ -58,12 +58,13 @@ We emulate distributed training on a local machine. In default, we use 2 X 2,i
### Download and preprocess distributed demo dataset ### Download and preprocess distributed demo dataset
This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train. This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train.
```bash ```bash
cd dist_data && sh dist_data_download.sh && cd .. cd dist_data && python dist_data_download.py && cd ..
``` ```
### Distributed Train and Infer ### Distributed Train and Infer
Train Train
```bash ```bash
# 该sh不支持Windows
sh cluster_train.sh sh cluster_train.sh
``` ```
params of cluster_train.sh: params of cluster_train.sh:
...@@ -89,7 +90,7 @@ Notes: ...@@ -89,7 +90,7 @@ Notes:
- The first trainer(with trainer_id 0) saves model params. - The first trainer(with trainer_id 0) saves model params.
- After each training, pserver processes should be stop manually. You can use command below: - After each training, pserver processes should be stop manually. You can use command below:
>ps -ef | grep python >ps -ef | grep python
- We use Dataset API to load data,it's only supported on Linux now. - We use Dataset API to load data,it's only supported on Linux now.
......
...@@ -133,7 +133,7 @@ def train(): ...@@ -133,7 +133,7 @@ def train():
dataset.set_batch_size(args.batch_size) dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread) dataset.set_thread(args.num_thread)
train_filelist = [ train_filelist = [
args.train_data_dir + '/' + x os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir) for x in os.listdir(args.train_data_dir)
] ]
...@@ -156,7 +156,8 @@ def train(): ...@@ -156,7 +156,8 @@ def train():
fetch_info=['epoch %d batch loss' % (epoch_id + 1)], fetch_info=['epoch %d batch loss' % (epoch_id + 1)],
print_period=5, print_period=5,
debug=False) debug=False)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model if args.trainer_id == 0: # only trainer 0 save model
......
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2"
print("download and extract starting...")
download_file_and_uncompress(url)
download_file(url2, "./aid_data/feat_dict_10.pkl2", True)
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess.py")
print("preprocess done")
shutil.rmtree("raw_data")
print("done")
#!/bin/bash
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2 -O ./aid_data/feat_dict_10.pkl2 || rm -f ./aid_data/feat_dict_10.pkl2
tar zxf dac.tar.gz >/dev/null 2>&1
rm -f dac.tar.gz
python preprocess.py
rm *.txt
rm -r raw_data
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url, savename="dist_data_demo.tar.gz")
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess_dist.py")
print("preprocess done")
print("done")
\ No newline at end of file
#!/bin/bash
# download small demo dataset
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz
tar xzvf dist_data_demo.tar.gz
# preprocess dataset
python preprocess_dist.py
...@@ -25,7 +25,8 @@ def infer(): ...@@ -25,7 +25,8 @@ def infer():
inference_scope = fluid.Scope() inference_scope = fluid.Scope()
test_files = [ test_files = [
args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) os.path.join(args.test_data_dir, x)
for x in os.listdir(args.test_data_dir)
] ]
criteo_dataset = CriteoDataset() criteo_dataset = CriteoDataset()
criteo_dataset.setup(args.feat_dict) criteo_dataset.setup(args.feat_dict)
...@@ -34,7 +35,8 @@ def infer(): ...@@ -34,7 +35,8 @@ def infer():
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch cur_model_path = os.path.join(args.model_output_dir,
'epoch_' + args.test_epoch)
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program): with fluid.framework.program_guard(test_program, startup_program):
......
...@@ -36,7 +36,8 @@ def train(): ...@@ -36,7 +36,8 @@ def train():
dataset.set_batch_size(args.batch_size) dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread) dataset.set_thread(args.num_thread)
train_filelist = [ train_filelist = [
args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir)
] ]
print('---------------------------------------------') print('---------------------------------------------')
...@@ -50,7 +51,8 @@ def train(): ...@@ -50,7 +51,8 @@ def train():
fetch_info=['epoch %d batch loss' % (epoch_id + 1)], fetch_info=['epoch %d batch loss' % (epoch_id + 1)],
print_period=1000, print_period=1000,
debug=False) debug=False)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
fluid.io.save_persistables( fluid.io.save_persistables(
......
...@@ -29,7 +29,7 @@ pip install -r requirements.txt ...@@ -29,7 +29,7 @@ pip install -r requirements.txt
下载数据集: 下载数据集:
```bash ```bash
cd data && ./download.sh && cd .. cd data && python download.py && cd ..
``` ```
## 模型 ## 模型
...@@ -56,6 +56,7 @@ python train.py \ ...@@ -56,6 +56,7 @@ python train.py \
本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率 本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率
```bash ```bash
# 该sh不支持Windows
sh cluster_train.sh sh cluster_train.sh
``` ```
......
...@@ -39,7 +39,7 @@ categorical features. For the test dataset, the labels are omitted. ...@@ -39,7 +39,7 @@ categorical features. For the test dataset, the labels are omitted.
Download dataset: Download dataset:
```bash ```bash
cd data && ./download.sh && cd .. cd data && python download.py && cd ..
``` ```
## Model ## Model
...@@ -73,6 +73,7 @@ In distributed training setting, training data is splited by trainer_id, so that ...@@ -73,6 +73,7 @@ In distributed training setting, training data is splited by trainer_id, so that
do not overlap among trainers do not overlap among trainers
```bash ```bash
# this shell not support Windows
sh cluster_train.sh sh cluster_train.sh
``` ```
......
import os
import shutil
import sys
import glob
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url)
print("download and extract finished")
if os.path.exists("raw"):
shutil.rmtree("raw")
os.mkdir("raw")
# mv ./*.txt raw/
files = glob.glob("*.txt")
for f in files:
shutil.move(f, "raw")
print("done")
...@@ -173,8 +173,8 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, ...@@ -173,8 +173,8 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
.format(pass_id, batch_id, loss_val / args.batch_size, .format(pass_id, batch_id, loss_val / args.batch_size,
auc_val, batch_auc_val)) auc_val, batch_auc_val))
if batch_id % 1000 == 0 and batch_id != 0: if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str( model_dir = os.path.join(args.model_output_dir,
batch_id) 'batch-' + str(batch_id))
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_persistables( fluid.io.save_persistables(
executor=exe, executor=exe,
...@@ -188,7 +188,7 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, ...@@ -188,7 +188,7 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
total_time += time.time() - pass_start total_time += time.time() - pass_start
model_dir = args.model_output_dir + '/pass-' + str(pass_id) model_dir = os.path.join(args.model_output_dir, 'pass-' + str(pass_id))
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_persistables( fluid.io.save_persistables(
executor=exe, executor=exe,
......
import os
import time
import shutil
import requests
import sys
import tarfile
import zipfile
import platform
import functools
lasttime = time.time()
FLUSH_INTERVAL = 0.1
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
def get_platform():
return platform.platform()
def is_windows():
return get_platform().lower().startswith("windows")
def progress(str, end=False):
global lasttime
if end:
str += "\n"
lasttime = 0
if time.time() - lasttime >= FLUSH_INTERVAL:
sys.stdout.write("\r%s" % str)
lasttime = time.time()
sys.stdout.flush()
def download_file(url, savepath, print_progress):
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
if total_length is None:
with open(savepath, 'wb') as f:
shutil.copyfileobj(r.raw, f)
else:
with open(savepath, 'wb') as f:
dl = 0
total_length = int(total_length)
starttime = time.time()
if print_progress:
print("Downloading %s" % os.path.basename(savepath))
for data in r.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
if print_progress:
done = int(50 * dl / total_length)
progress("[%-50s] %.2f%%" %
('=' * done, float(100 * dl) / total_length))
if print_progress:
progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True)
def _uncompress_file(filepath, extrapath, delete_file, print_progress):
if print_progress:
print("Uncompress %s" % os.path.basename(filepath))
if filepath.endswith("zip"):
handler = _uncompress_file_zip
elif filepath.endswith("tgz"):
handler = _uncompress_file_tar
else:
handler = functools.partial(_uncompress_file_tar, mode="r")
for total_num, index, rootpath in handler(filepath, extrapath):
if print_progress:
done = int(50 * float(index) / total_num)
progress("[%-50s] %.2f%%" %
('=' * done, float(100 * index) / total_num))
if print_progress:
progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True)
if delete_file:
os.remove(filepath)
return rootpath
def _uncompress_file_zip(filepath, extrapath):
files = zipfile.ZipFile(filepath, 'r')
filelist = files.namelist()
rootpath = filelist[0]
total_num = len(filelist)
for index, file in enumerate(filelist):
files.extract(file, extrapath)
yield total_num, index, rootpath
files.close()
yield total_num, index, rootpath
def _uncompress_file_tar(filepath, extrapath, mode="r:gz"):
files = tarfile.open(filepath, mode)
filelist = files.getnames()
total_num = len(filelist)
rootpath = filelist[0]
for index, file in enumerate(filelist):
files.extract(file, extrapath)
yield total_num, index, rootpath
files.close()
yield total_num, index, rootpath
def download_file_and_uncompress(url,
savepath=None,
savename=None,
extrapath=None,
print_progress=True,
cover=False,
delete_file=False):
if savepath is None:
savepath = "."
if extrapath is None:
extrapath = "."
if savename is None:
savename = url.split("/")[-1]
savepath = os.path.join(savepath, savename)
if cover:
if os.path.exists(savepath):
shutil.rmtree(savepath)
if not os.path.exists(savepath):
download_file(url, savepath, print_progress)
_ = _uncompress_file(savepath, extrapath, delete_file, print_progress)
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
demo数据集,在data目录下执行命令,下载数据 demo数据集,在data目录下执行命令,下载数据
```bash ```bash
sh download.sh python download.py
``` ```
## 环境 ## 环境
...@@ -39,6 +39,7 @@ test_epoch设置加载第10轮训练的模型。 ...@@ -39,6 +39,7 @@ test_epoch设置加载第10轮训练的模型。
数据下载同上面命令。 数据下载同上面命令。
```bash ```bash
# 该sh不支持Windows
sh cluster_train.sh sh cluster_train.sh
``` ```
参数说明: 参数说明:
...@@ -64,7 +65,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0 ...@@ -64,7 +65,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0
- 0号trainer保存模型参数 - 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python >ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下 - 数据读取使用dataset模式,目前仅支持运行在Linux环境下
...@@ -139,7 +139,7 @@ def train(): ...@@ -139,7 +139,7 @@ def train():
dataset.set_pipe_command('python criteo_reader.py') dataset.set_pipe_command('python criteo_reader.py')
dataset.set_batch_size(args.batch_size) dataset.set_batch_size(args.batch_size)
dataset.set_filelist([ dataset.set_filelist([
args.train_data_dir + '/' + x os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir) for x in os.listdir(args.train_data_dir)
]) ])
...@@ -161,7 +161,8 @@ def train(): ...@@ -161,7 +161,8 @@ def train():
fetch_info=['loss', 'auc'], fetch_info=['loss', 'auc'],
debug=False, debug=False,
print_period=args.print_steps) print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
if args.trainer_id == 0: # only trainer 0 save model if args.trainer_id == 0: # only trainer 0 save model
......
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url_train = "https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr"
url_test = "https://paddlerec.bj.bcebos.com/xdeepfm%2Fev"
train_dir = "train_data"
test_dir = "test_data"
if not os.path.exists(train_dir):
os.mkdir(train_dir)
if not os.path.exists(test_dir):
os.mkdir(test_dir)
print("download and extract starting...")
download_file(url_train, "./train_data/tr", True)
download_file(url_test, "./test_data/ev", True)
print("download and extract finished")
print("done")
#!/bin/bash
if [ ! -d "train_data" ]; then
mkdir train_data
fi
if [ ! -d "test_data" ]; then
mkdir test_data
fi
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Fev -O ./test_data/ev
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr -O ./train_data/tr
...@@ -26,7 +26,8 @@ def infer(): ...@@ -26,7 +26,8 @@ def infer():
inference_scope = fluid.Scope() inference_scope = fluid.Scope()
test_files = [ test_files = [
args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) os.path.join(args.test_data_dir, x)
for x in os.listdir(args.test_data_dir)
] ]
criteo_dataset = CriteoDataset() criteo_dataset = CriteoDataset()
test_reader = paddle.batch( test_reader = paddle.batch(
...@@ -34,7 +35,8 @@ def infer(): ...@@ -34,7 +35,8 @@ def infer():
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch cur_model_path = os.path.join(args.model_output_dir,
'epoch_' + args.test_epoch)
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program): with fluid.framework.program_guard(test_program, startup_program):
......
...@@ -4,6 +4,7 @@ import paddle.fluid as fluid ...@@ -4,6 +4,7 @@ import paddle.fluid as fluid
import sys import sys
import network_conf import network_conf
import time import time
import utils
def train(): def train():
...@@ -25,7 +26,8 @@ def train(): ...@@ -25,7 +26,8 @@ def train():
dataset.set_pipe_command('python criteo_reader.py') dataset.set_pipe_command('python criteo_reader.py')
dataset.set_batch_size(args.batch_size) dataset.set_batch_size(args.batch_size)
dataset.set_filelist([ dataset.set_filelist([
args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir)
]) ])
if args.use_gpu == 1: if args.use_gpu == 1:
...@@ -46,7 +48,8 @@ def train(): ...@@ -46,7 +48,8 @@ def train():
fetch_info=['loss', 'auc'], fetch_info=['loss', 'auc'],
debug=False, debug=False,
print_period=args.print_steps) print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) model_dir = os.path.join(args.model_output_dir,
'epoch_' + str(epoch_id + 1))
sys.stderr.write('epoch%d is finished and takes %f s\n' % ( sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start)) (epoch_id + 1), time.time() - start))
fluid.io.save_persistables( fluid.io.save_persistables(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册