From 4a5b4c10bcbc7f4cee7f0620999d40fc01531c2c Mon Sep 17 00:00:00 2001 From: zhoushiyu <31816202+wilhelmzh@users.noreply.github.com> Date: Mon, 11 Nov 2019 14:00:17 +0800 Subject: [PATCH] [cherry-pick]update win apis in paddlerec/ctr (#3903) * fix apis, including os.path, shell and multiprocess * use mul.cpu_count in win --- .../Paddle_baseline_KDD2019/generate_test.py | 26 ++-- .../ctr/Paddle_baseline_KDD2019/infer.py | 25 ++-- .../Paddle_baseline_KDD2019/local_train.py | 57 +++++--- PaddleRec/ctr/dcn/README.md | 9 +- PaddleRec/ctr/dcn/cluster_train.py | 3 +- PaddleRec/ctr/dcn/config.py | 2 - PaddleRec/ctr/dcn/data/download.py | 24 ++++ PaddleRec/ctr/dcn/data/download.sh | 23 --- PaddleRec/ctr/dcn/data/preprocess.py | 9 +- PaddleRec/ctr/dcn/dist_data/dist_download.py | 19 +++ PaddleRec/ctr/dcn/dist_data/dist_download.sh | 7 - .../ctr/dcn/dist_data/dist_preprocess.py | 3 - PaddleRec/ctr/dcn/infer.py | 5 +- PaddleRec/ctr/dcn/local_train.py | 5 +- PaddleRec/ctr/dcn/network.py | 2 - PaddleRec/ctr/dcn/reader.py | 2 - PaddleRec/ctr/deepfm/README.md | 7 +- PaddleRec/ctr/deepfm/cluster_train.py | 5 +- .../ctr/deepfm/data/download_preprocess.py | 25 ++++ .../ctr/deepfm/data/download_preprocess.sh | 10 -- .../deepfm/dist_data/dist_data_download.py | 22 +++ .../deepfm/dist_data/dist_data_download.sh | 7 - PaddleRec/ctr/deepfm/infer.py | 6 +- PaddleRec/ctr/deepfm/local_train.py | 6 +- PaddleRec/ctr/dnn/README.cn.md | 3 +- PaddleRec/ctr/dnn/README.md | 3 +- PaddleRec/ctr/dnn/data/download.py | 28 ++++ PaddleRec/ctr/dnn/train.py | 6 +- PaddleRec/ctr/tools/tools.py | 133 ++++++++++++++++++ PaddleRec/ctr/xdeepfm/README.md | 5 +- PaddleRec/ctr/xdeepfm/cluster_train.py | 5 +- PaddleRec/ctr/xdeepfm/data/download.py | 28 ++++ PaddleRec/ctr/xdeepfm/data/download.sh | 12 -- PaddleRec/ctr/xdeepfm/infer.py | 6 +- PaddleRec/ctr/xdeepfm/local_train.py | 7 +- 35 files changed, 397 insertions(+), 148 deletions(-) create mode 100644 PaddleRec/ctr/dcn/data/download.py delete mode 100755 PaddleRec/ctr/dcn/data/download.sh create mode 100644 PaddleRec/ctr/dcn/dist_data/dist_download.py delete mode 100755 PaddleRec/ctr/dcn/dist_data/dist_download.sh create mode 100644 PaddleRec/ctr/deepfm/data/download_preprocess.py delete mode 100644 PaddleRec/ctr/deepfm/data/download_preprocess.sh create mode 100644 PaddleRec/ctr/deepfm/dist_data/dist_data_download.py delete mode 100644 PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh create mode 100644 PaddleRec/ctr/dnn/data/download.py create mode 100644 PaddleRec/ctr/tools/tools.py create mode 100644 PaddleRec/ctr/xdeepfm/data/download.py delete mode 100644 PaddleRec/ctr/xdeepfm/data/download.sh diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py index 8c39950f..66bf13d2 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import argparse import logging import numpy as np @@ -22,12 +21,12 @@ import os os.environ["CUDA_VISIBLE_DEVICES"] = "" import paddle import paddle.fluid as fluid -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) num_context_feature = 22 + def parse_args(): parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example") parser.add_argument( @@ -59,6 +58,7 @@ def parse_args(): return parser.parse_args() + def to_lodtensor(data, place): seq_lens = [len(seq) for seq in data] cur_len = 0 @@ -72,7 +72,6 @@ def to_lodtensor(data, place): res.set(flattened_data, place) res.set_lod([lod]) - return res @@ -91,7 +90,8 @@ def data2tensor(data, place): sparse_data = to_lodtensor([x[1 + i] for x in data], place) feed_dict["context" + str(i)] = sparse_data - context_fm = to_lodtensor(np.array([x[-2] for x in data]).astype("float32"), place) + context_fm = to_lodtensor( + np.array([x[-2] for x in data]).astype("float32"), place) feed_dict["context_fm"] = context_fm y_data = np.array([x[-1] for x in data]).astype("int64") @@ -99,6 +99,7 @@ def data2tensor(data, place): feed_dict["label"] = y_data return feed_dict + def test(): args = parse_args() @@ -112,13 +113,14 @@ def test(): exe = fluid.Executor(place) whole_filelist = ["./out/normed_test_session.txt"] - test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] - + test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len( + whole_filelist))] epochs = 1 for i in range(epochs): - cur_model_path = args.model_path + "/epoch" + str(1) + ".model" + cur_model_path = os.path.join(args.model_path, + "epoch" + str(1) + ".model") with open("./testres/res" + str(i), 'w') as r: with fluid.scope_guard(test_scope): [inference_program, feed_target_names, fetch_targets] = \ @@ -129,9 +131,11 @@ def test(): for batch_id, data in enumerate(test_reader()): print(len(data[0])) feed_dict = data2tensor(data, place) - loss_val, auc_val, accuracy, predict, _ = exe.run(inference_program, - feed=feed_dict, - fetch_list=fetch_targets, return_numpy=False) + loss_val, auc_val, accuracy, predict, _ = exe.run( + inference_program, + feed=feed_dict, + fetch_list=fetch_targets, + return_numpy=False) x = np.array(predict) for j in range(x.shape[0]): diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py index 5c58ecf1..c218ce0f 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py @@ -12,8 +12,7 @@ import paddle.fluid as fluid import map_reader from network_conf import ctr_deepfm_dataset -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) @@ -91,15 +90,20 @@ def infer(): place = fluid.CPUPlace() inference_scope = fluid.core.Scope() - filelist = ["%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path)] + filelist = [ + "%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path) + ] from map_reader import MapDataset map_dataset = MapDataset() map_dataset.setup(args.sparse_feature_dim) exe = fluid.Executor(place) - whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] + whole_filelist = [ + "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data"))) + ] #whole_filelist = ["./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] - test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] + test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len( + whole_filelist))] # file_groups = [whole_filelist[i:i+train_thread_num] for i in range(0, len(whole_filelist), train_thread_num)] @@ -110,7 +114,8 @@ def infer(): epochs = 2 for i in range(epochs): - cur_model_path = args.model_path + "/epoch" + str(i + 1) + ".model" + cur_model_path = os.path.join(args.model_path, + "epoch" + str(i + 1) + ".model") with fluid.scope_guard(inference_scope): [inference_program, feed_target_names, fetch_targets] = \ fluid.io.load_inference_model(cur_model_path, exe) @@ -120,9 +125,11 @@ def infer(): test_reader = map_dataset.infer_reader(test_files, 1000, 100000) for batch_id, data in enumerate(test_reader()): - loss_val, auc_val, accuracy, predict, label = exe.run(inference_program, - feed=data2tensor(data, place), - fetch_list=fetch_targets, return_numpy=False) + loss_val, auc_val, accuracy, predict, label = exe.run( + inference_program, + feed=data2tensor(data, place), + fetch_list=fetch_targets, + return_numpy=False) #print(np.array(predict)) #x = np.array(predict) diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py index 62241f2b..9d7e9452 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py @@ -6,33 +6,36 @@ import paddle.fluid as fluid import sys from network_confv6 import ctr_deepfm_dataset - NUM_CONTEXT_FEATURE = 22 DIM_USER_PROFILE = 10 DIM_DENSE_FEATURE = 3 -PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours +PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours + def train(): args = parse_args() if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) - + #set the input format for our model. Note that you need to carefully modify them when you define a new network #user_profile = fluid.layers.data( - #name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1) + #name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1) dense_feature = fluid.layers.data( name="dense_feature", shape=[DIM_DENSE_FEATURE], dtype='float32') context_feature = [ - fluid.layers.data(name="context" + str(i), shape=[1], lod_level=1, dtype="int64") - for i in range(0, NUM_CONTEXT_FEATURE)] + fluid.layers.data( + name="context" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(0, NUM_CONTEXT_FEATURE) + ] context_feature_fm = fluid.layers.data( name="context_fm", shape=[1], dtype='int64', lod_level=1) label = fluid.layers.data(name='label', shape=[1], dtype='int64') print("ready to network") #self define network - loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset(dense_feature, context_feature, context_feature_fm, label, - args.embedding_size, args.sparse_feature_dim) + loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset( + dense_feature, context_feature, context_feature_fm, label, + args.embedding_size, args.sparse_feature_dim) print("ready to optimize") optimizer = fluid.optimizer.SGD(learning_rate=1e-4) @@ -42,7 +45,8 @@ def train(): exe.run(fluid.default_startup_program()) #use dataset api for much faster speed dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([dense_feature] + context_feature + [context_feature_fm] + [label]) + dataset.set_use_var([dense_feature] + context_feature + + [context_feature_fm] + [label]) #self define how to process generated training insatnces in map_reader.py pipe_command = PYTHON_PATH + " map_reader.py %d" % args.sparse_feature_dim dataset.set_pipe_command(pipe_command) @@ -50,27 +54,36 @@ def train(): thread_num = 1 dataset.set_thread(thread_num) #self define how to split training files for example:"split -a 2 -d -l 200000 normed_train.txt normed_train" - whole_filelist = ["./out/normed_train%d" % x for x in range(len(os.listdir("out")))] - whole_filelist = ["./out/normed_train00", "./out/normed_train01", "./out/normed_train02", "./out/normed_train03", - "./out/normed_train04", "./out/normed_train05", "./out/normed_train06", "./out/normed_train07", - "./out/normed_train08", - "./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] + whole_filelist = [ + "./out/normed_train%d" % x for x in range(len(os.listdir("out"))) + ] + whole_filelist = [ + "./out/normed_train00", "./out/normed_train01", "./out/normed_train02", + "./out/normed_train03", "./out/normed_train04", "./out/normed_train05", + "./out/normed_train06", "./out/normed_train07", "./out/normed_train08", + "./out/normed_train09", "./out/normed_train10", "./out/normed_train11" + ] print("ready to epochs") epochs = 10 for i in range(epochs): print("start %dth epoch" % i) dataset.set_filelist(whole_filelist[:int(len(whole_filelist))]) #print the informations you want by setting fetch_list and fetch_info - exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=[auc_var, accuracy, predict, label], - fetch_info=["auc", "accuracy", "predict", "label"], - debug=False) - model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[auc_var, accuracy, predict, label], + fetch_info=["auc", "accuracy", "predict", "label"], + debug=False) + model_dir = os.path.join(args.model_output_dir, + '/epoch' + str(i + 1) + ".model") sys.stderr.write("epoch%d finished" % (i + 1)) #save model - fluid.io.save_inference_model(model_dir, [dense_feature.name] + [x.name for x in context_feature] + [context_feature_fm.name] + [label.name], - [loss, auc_var, accuracy, predict, label], exe) + fluid.io.save_inference_model( + model_dir, + [dense_feature.name] + [x.name for x in context_feature] + + [context_feature_fm.name] + [label.name], + [loss, auc_var, accuracy, predict, label], exe) if __name__ == '__main__': diff --git a/PaddleRec/ctr/dcn/README.md b/PaddleRec/ctr/dcn/README.md index 4c59c39a..1ac51ddc 100644 --- a/PaddleRec/ctr/dcn/README.md +++ b/PaddleRec/ctr/dcn/README.md @@ -32,7 +32,7 @@ DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions] 数据下载命令 ```bash -cd data && sh download.sh +cd data && python download.py ``` ## 数据处理 @@ -70,13 +70,14 @@ loss: [0.44703564] auc_val: [0.80654419] ## 多机训练 首先使用命令下载并预处理小规模样例数据集: ```bash -cd dist_data && sh dist_download.sh && cd .. +cd dist_data && python dist_download.py && cd .. ``` 运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。 **注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。** ```bash +# 该sh不支持Windows sh cluster_train.sh ``` 参数说明: @@ -102,7 +103,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_da - 0号trainer保存模型参数 - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - + >ps -ef | grep python - + - 数据读取使用dataset模式,目前仅支持运行在Linux环境下 diff --git a/PaddleRec/ctr/dcn/cluster_train.py b/PaddleRec/ctr/dcn/cluster_train.py index aa862ea3..601d3a94 100644 --- a/PaddleRec/ctr/dcn/cluster_train.py +++ b/PaddleRec/ctr/dcn/cluster_train.py @@ -162,7 +162,8 @@ def train(): fetch_info=['total_loss', 'avg_logloss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/dcn/config.py b/PaddleRec/ctr/dcn/config.py index e17b9dc5..dafb009e 100644 --- a/PaddleRec/ctr/dcn/config.py +++ b/PaddleRec/ctr/dcn/config.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 import argparse """ global params diff --git a/PaddleRec/ctr/dcn/data/download.py b/PaddleRec/ctr/dcn/data/download.py new file mode 100644 index 00000000..b2fedfe8 --- /dev/null +++ b/PaddleRec/ctr/dcn/data/download.py @@ -0,0 +1,24 @@ +import os +import sys +import io + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + trainfile = 'train.txt' + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url) + print("download and extract finished") + + count = 0 + for _ in io.open(trainfile, 'r', encoding='utf-8'): + count += 1 + + print("total records: %d" % count) + print("done") diff --git a/PaddleRec/ctr/dcn/data/download.sh b/PaddleRec/ctr/dcn/data/download.sh deleted file mode 100755 index 9d96285b..00000000 --- a/PaddleRec/ctr/dcn/data/download.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -workdir=$(cd $(dirname $0); pwd) - -cd $workdir - -trainfile='train.txt' - -echo "data dir:" ${workdir} - -cd $workdir - -echo "download data starting..." -wget --no-check-certificate -c https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz -echo "download finished" - -echo "extracting ..." -tar xzf dac.tar.gz >/dev/null 2>&1 -wc -l $trainfile | awk '{print $1}' > line_nums.log - -echo "extract finished" -echo "total records: "`cat line_nums.log` -echo "done" diff --git a/PaddleRec/ctr/dcn/data/preprocess.py b/PaddleRec/ctr/dcn/data/preprocess.py index 6ee5044b..dd23c7dd 100644 --- a/PaddleRec/ctr/dcn/data/preprocess.py +++ b/PaddleRec/ctr/dcn/data/preprocess.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os @@ -19,7 +17,6 @@ VOCAB_DIR = 'vocab' TRAIN_DIR = 'train' TEST_VALID_DIR = 'test_valid' SPLIT_RATIO = 0.9 -LINE_NUMS = "line_nums.log" FREQ_THR = 10 INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] @@ -113,11 +110,13 @@ def split_data(): fout.close() data_dir = TEST_VALID_DIR cur_part_idx = int(line_idx / 200000) - fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') + fout = open( + os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w') if line_idx % 200000 == 0 and line_idx != 0: fout.close() cur_part_idx = int(line_idx / 200000) - fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') + fout = open( + os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w') fout.write(line) fout.close() fin.close() diff --git a/PaddleRec/ctr/dcn/dist_data/dist_download.py b/PaddleRec/ctr/dcn/dist_data/dist_download.py new file mode 100644 index 00000000..662982f6 --- /dev/null +++ b/PaddleRec/ctr/dcn/dist_data/dist_download.py @@ -0,0 +1,19 @@ +from __future__ import print_function +import os +import sys +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url, savename="dist_data_demo.tar.gz") + print("download and extract finished") + + print("preprocessing...") + os.system("python dist_preprocess.py") + print("preprocess done") \ No newline at end of file diff --git a/PaddleRec/ctr/dcn/dist_data/dist_download.sh b/PaddleRec/ctr/dcn/dist_data/dist_download.sh deleted file mode 100755 index 78b3841a..00000000 --- a/PaddleRec/ctr/dcn/dist_data/dist_download.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -# download small demo dataset -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz -tar xzvf dist_data_demo.tar.gz -# preprocess demo dataset -python dist_preprocess.py diff --git a/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py b/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py index 6a4c801c..afad881b 100644 --- a/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py +++ b/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os @@ -21,7 +19,6 @@ TEST_DIR = 'dist_test_valid_data' TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr') TEST_FILE = os.path.join(TEST_DIR, 'ev') SPLIT_RATIO = 0.9 -LINE_NUMS = "line_nums.log" FREQ_THR = 10 INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] diff --git a/PaddleRec/ctr/dcn/infer.py b/PaddleRec/ctr/dcn/infer.py index 25e1337d..47e628e0 100644 --- a/PaddleRec/ctr/dcn/infer.py +++ b/PaddleRec/ctr/dcn/infer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 import logging import random @@ -46,7 +44,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/dcn/local_train.py b/PaddleRec/ctr/dcn/local_train.py index bb8c4240..29badfe4 100644 --- a/PaddleRec/ctr/dcn/local_train.py +++ b/PaddleRec/ctr/dcn/local_train.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os import random @@ -75,7 +73,8 @@ def train(args): fetch_info=['total_loss', 'avg_logloss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables( diff --git a/PaddleRec/ctr/dcn/network.py b/PaddleRec/ctr/dcn/network.py index f0637227..bb29b26b 100644 --- a/PaddleRec/ctr/dcn/network.py +++ b/PaddleRec/ctr/dcn/network.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import paddle.fluid as fluid from collections import OrderedDict diff --git a/PaddleRec/ctr/dcn/reader.py b/PaddleRec/ctr/dcn/reader.py index aa915f6e..291fc988 100644 --- a/PaddleRec/ctr/dcn/reader.py +++ b/PaddleRec/ctr/dcn/reader.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 """ dataset and reader """ diff --git a/PaddleRec/ctr/deepfm/README.md b/PaddleRec/ctr/deepfm/README.md index 9ff01aa1..ace75e54 100644 --- a/PaddleRec/ctr/deepfm/README.md +++ b/PaddleRec/ctr/deepfm/README.md @@ -25,7 +25,7 @@ To preprocess the raw dataset, we min-max normalize continuous features to [0, 1 Download and preprocess data: ```bash -cd data && sh download_preprocess.sh && cd .. +cd data && python download_preprocess.py && cd .. ``` After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2". @@ -58,12 +58,13 @@ We emulate distributed training on a local machine. In default, we use 2 X 2,i ### Download and preprocess distributed demo dataset This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train. ```bash -cd dist_data && sh dist_data_download.sh && cd .. +cd dist_data && python dist_data_download.py && cd .. ``` ### Distributed Train and Infer Train ```bash +# 该sh不支持Windows sh cluster_train.sh ``` params of cluster_train.sh: @@ -89,7 +90,7 @@ Notes: - The first trainer(with trainer_id 0) saves model params. - After each training, pserver processes should be stop manually. You can use command below: - + >ps -ef | grep python - We use Dataset API to load data,it's only supported on Linux now. diff --git a/PaddleRec/ctr/deepfm/cluster_train.py b/PaddleRec/ctr/deepfm/cluster_train.py index 5f03fee9..5cac2419 100644 --- a/PaddleRec/ctr/deepfm/cluster_train.py +++ b/PaddleRec/ctr/deepfm/cluster_train.py @@ -133,7 +133,7 @@ def train(): dataset.set_batch_size(args.batch_size) dataset.set_thread(args.num_thread) train_filelist = [ - args.train_data_dir + '/' + x + os.path.join(args.train_data_dir, x) for x in os.listdir(args.train_data_dir) ] @@ -156,7 +156,8 @@ def train(): fetch_info=['epoch %d batch loss' % (epoch_id + 1)], print_period=5, debug=False) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/deepfm/data/download_preprocess.py b/PaddleRec/ctr/deepfm/data/download_preprocess.py new file mode 100644 index 00000000..05461023 --- /dev/null +++ b/PaddleRec/ctr/deepfm/data/download_preprocess.py @@ -0,0 +1,25 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress, download_file + +if __name__ == '__main__': + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2" + + print("download and extract starting...") + download_file_and_uncompress(url) + download_file(url2, "./aid_data/feat_dict_10.pkl2", True) + print("download and extract finished") + + print("preprocessing...") + os.system("python preprocess.py") + print("preprocess done") + + shutil.rmtree("raw_data") + print("done") diff --git a/PaddleRec/ctr/deepfm/data/download_preprocess.sh b/PaddleRec/ctr/deepfm/data/download_preprocess.sh deleted file mode 100644 index eed4ffe2..00000000 --- a/PaddleRec/ctr/deepfm/data/download_preprocess.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2 -O ./aid_data/feat_dict_10.pkl2 || rm -f ./aid_data/feat_dict_10.pkl2 -tar zxf dac.tar.gz >/dev/null 2>&1 -rm -f dac.tar.gz - -python preprocess.py -rm *.txt -rm -r raw_data diff --git a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py new file mode 100644 index 00000000..63e2756d --- /dev/null +++ b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py @@ -0,0 +1,22 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url, savename="dist_data_demo.tar.gz") + print("download and extract finished") + + print("preprocessing...") + os.system("python preprocess_dist.py") + print("preprocess done") + + print("done") \ No newline at end of file diff --git a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh deleted file mode 100644 index 6db0dbdc..00000000 --- a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -# download small demo dataset -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz -tar xzvf dist_data_demo.tar.gz -# preprocess dataset -python preprocess_dist.py diff --git a/PaddleRec/ctr/deepfm/infer.py b/PaddleRec/ctr/deepfm/infer.py index 527d389c..cb798a6a 100644 --- a/PaddleRec/ctr/deepfm/infer.py +++ b/PaddleRec/ctr/deepfm/infer.py @@ -25,7 +25,8 @@ def infer(): inference_scope = fluid.Scope() test_files = [ - args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) + os.path.join(args.test_data_dir, x) + for x in os.listdir(args.test_data_dir) ] criteo_dataset = CriteoDataset() criteo_dataset.setup(args.feat_dict) @@ -34,7 +35,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/deepfm/local_train.py b/PaddleRec/ctr/deepfm/local_train.py index d81ad518..dbf62d96 100644 --- a/PaddleRec/ctr/deepfm/local_train.py +++ b/PaddleRec/ctr/deepfm/local_train.py @@ -36,7 +36,8 @@ def train(): dataset.set_batch_size(args.batch_size) dataset.set_thread(args.num_thread) train_filelist = [ - args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) + os.path.join(args.train_data_dir, x) + for x in os.listdir(args.train_data_dir) ] print('---------------------------------------------') @@ -50,7 +51,8 @@ def train(): fetch_info=['epoch %d batch loss' % (epoch_id + 1)], print_period=1000, debug=False) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables( diff --git a/PaddleRec/ctr/dnn/README.cn.md b/PaddleRec/ctr/dnn/README.cn.md index 54a7d55b..e5b6ef40 100644 --- a/PaddleRec/ctr/dnn/README.cn.md +++ b/PaddleRec/ctr/dnn/README.cn.md @@ -29,7 +29,7 @@ pip install -r requirements.txt 下载数据集: ```bash -cd data && ./download.sh && cd .. +cd data && python download.py && cd .. ``` ## 模型 @@ -56,6 +56,7 @@ python train.py \ 本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率 ```bash +# 该sh不支持Windows sh cluster_train.sh ``` diff --git a/PaddleRec/ctr/dnn/README.md b/PaddleRec/ctr/dnn/README.md index 9587a2a8..7c4678f2 100644 --- a/PaddleRec/ctr/dnn/README.md +++ b/PaddleRec/ctr/dnn/README.md @@ -39,7 +39,7 @@ categorical features. For the test dataset, the labels are omitted. Download dataset: ```bash -cd data && ./download.sh && cd .. +cd data && python download.py && cd .. ``` ## Model @@ -73,6 +73,7 @@ In distributed training setting, training data is splited by trainer_id, so that do not overlap among trainers ```bash +# this shell not support Windows sh cluster_train.sh ``` diff --git a/PaddleRec/ctr/dnn/data/download.py b/PaddleRec/ctr/dnn/data/download.py new file mode 100644 index 00000000..b88191ff --- /dev/null +++ b/PaddleRec/ctr/dnn/data/download.py @@ -0,0 +1,28 @@ +import os +import shutil +import sys +import glob + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url) + print("download and extract finished") + + if os.path.exists("raw"): + shutil.rmtree("raw") + os.mkdir("raw") + + # mv ./*.txt raw/ + files = glob.glob("*.txt") + for f in files: + shutil.move(f, "raw") + + print("done") diff --git a/PaddleRec/ctr/dnn/train.py b/PaddleRec/ctr/dnn/train.py index f63edeeb..dfb056d3 100644 --- a/PaddleRec/ctr/dnn/train.py +++ b/PaddleRec/ctr/dnn/train.py @@ -173,8 +173,8 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, .format(pass_id, batch_id, loss_val / args.batch_size, auc_val, batch_auc_val)) if batch_id % 1000 == 0 and batch_id != 0: - model_dir = args.model_output_dir + '/batch-' + str( - batch_id) + model_dir = os.path.join(args.model_output_dir, + 'batch-' + str(batch_id)) if args.trainer_id == 0: fluid.io.save_persistables( executor=exe, @@ -188,7 +188,7 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, total_time += time.time() - pass_start - model_dir = args.model_output_dir + '/pass-' + str(pass_id) + model_dir = os.path.join(args.model_output_dir, 'pass-' + str(pass_id)) if args.trainer_id == 0: fluid.io.save_persistables( executor=exe, diff --git a/PaddleRec/ctr/tools/tools.py b/PaddleRec/ctr/tools/tools.py new file mode 100644 index 00000000..da34a027 --- /dev/null +++ b/PaddleRec/ctr/tools/tools.py @@ -0,0 +1,133 @@ +import os +import time +import shutil +import requests +import sys +import tarfile +import zipfile +import platform +import functools + +lasttime = time.time() +FLUSH_INTERVAL = 0.1 + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) + + +def get_platform(): + return platform.platform() + + +def is_windows(): + return get_platform().lower().startswith("windows") + + +def progress(str, end=False): + global lasttime + if end: + str += "\n" + lasttime = 0 + if time.time() - lasttime >= FLUSH_INTERVAL: + sys.stdout.write("\r%s" % str) + lasttime = time.time() + sys.stdout.flush() + + +def download_file(url, savepath, print_progress): + r = requests.get(url, stream=True) + total_length = r.headers.get('content-length') + + if total_length is None: + with open(savepath, 'wb') as f: + shutil.copyfileobj(r.raw, f) + else: + with open(savepath, 'wb') as f: + dl = 0 + total_length = int(total_length) + starttime = time.time() + if print_progress: + print("Downloading %s" % os.path.basename(savepath)) + for data in r.iter_content(chunk_size=4096): + dl += len(data) + f.write(data) + if print_progress: + done = int(50 * dl / total_length) + progress("[%-50s] %.2f%%" % + ('=' * done, float(100 * dl) / total_length)) + if print_progress: + progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True) + + +def _uncompress_file(filepath, extrapath, delete_file, print_progress): + if print_progress: + print("Uncompress %s" % os.path.basename(filepath)) + + if filepath.endswith("zip"): + handler = _uncompress_file_zip + elif filepath.endswith("tgz"): + handler = _uncompress_file_tar + else: + handler = functools.partial(_uncompress_file_tar, mode="r") + + for total_num, index, rootpath in handler(filepath, extrapath): + if print_progress: + done = int(50 * float(index) / total_num) + progress("[%-50s] %.2f%%" % + ('=' * done, float(100 * index) / total_num)) + if print_progress: + progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True) + + if delete_file: + os.remove(filepath) + + return rootpath + + +def _uncompress_file_zip(filepath, extrapath): + files = zipfile.ZipFile(filepath, 'r') + filelist = files.namelist() + rootpath = filelist[0] + total_num = len(filelist) + for index, file in enumerate(filelist): + files.extract(file, extrapath) + yield total_num, index, rootpath + files.close() + yield total_num, index, rootpath + + +def _uncompress_file_tar(filepath, extrapath, mode="r:gz"): + files = tarfile.open(filepath, mode) + filelist = files.getnames() + total_num = len(filelist) + rootpath = filelist[0] + for index, file in enumerate(filelist): + files.extract(file, extrapath) + yield total_num, index, rootpath + files.close() + yield total_num, index, rootpath + + +def download_file_and_uncompress(url, + savepath=None, + savename=None, + extrapath=None, + print_progress=True, + cover=False, + delete_file=False): + if savepath is None: + savepath = "." + + if extrapath is None: + extrapath = "." + + if savename is None: + savename = url.split("/")[-1] + savepath = os.path.join(savepath, savename) + + if cover: + if os.path.exists(savepath): + shutil.rmtree(savepath) + + if not os.path.exists(savepath): + download_file(url, savepath, print_progress) + _ = _uncompress_file(savepath, extrapath, delete_file, print_progress) diff --git a/PaddleRec/ctr/xdeepfm/README.md b/PaddleRec/ctr/xdeepfm/README.md index cf759ec0..57341fc1 100644 --- a/PaddleRec/ctr/xdeepfm/README.md +++ b/PaddleRec/ctr/xdeepfm/README.md @@ -8,7 +8,7 @@ demo数据集,在data目录下执行命令,下载数据 ```bash -sh download.sh +python download.py ``` ## 环境 @@ -39,6 +39,7 @@ test_epoch设置加载第10轮训练的模型。 数据下载同上面命令。 ```bash +# 该sh不支持Windows sh cluster_train.sh ``` 参数说明: @@ -64,7 +65,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0 - 0号trainer保存模型参数 - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - + >ps -ef | grep python - 数据读取使用dataset模式,目前仅支持运行在Linux环境下 diff --git a/PaddleRec/ctr/xdeepfm/cluster_train.py b/PaddleRec/ctr/xdeepfm/cluster_train.py index 0c2b4ea7..727e921c 100644 --- a/PaddleRec/ctr/xdeepfm/cluster_train.py +++ b/PaddleRec/ctr/xdeepfm/cluster_train.py @@ -139,7 +139,7 @@ def train(): dataset.set_pipe_command('python criteo_reader.py') dataset.set_batch_size(args.batch_size) dataset.set_filelist([ - args.train_data_dir + '/' + x + os.path.join(args.train_data_dir, x) for x in os.listdir(args.train_data_dir) ]) @@ -161,7 +161,8 @@ def train(): fetch_info=['loss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/xdeepfm/data/download.py b/PaddleRec/ctr/xdeepfm/data/download.py new file mode 100644 index 00000000..4b216967 --- /dev/null +++ b/PaddleRec/ctr/xdeepfm/data/download.py @@ -0,0 +1,28 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress, download_file + +if __name__ == '__main__': + url_train = "https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr" + url_test = "https://paddlerec.bj.bcebos.com/xdeepfm%2Fev" + + train_dir = "train_data" + test_dir = "test_data" + + if not os.path.exists(train_dir): + os.mkdir(train_dir) + if not os.path.exists(test_dir): + os.mkdir(test_dir) + + print("download and extract starting...") + download_file(url_train, "./train_data/tr", True) + download_file(url_test, "./test_data/ev", True) + print("download and extract finished") + + print("done") diff --git a/PaddleRec/ctr/xdeepfm/data/download.sh b/PaddleRec/ctr/xdeepfm/data/download.sh deleted file mode 100644 index 95438938..00000000 --- a/PaddleRec/ctr/xdeepfm/data/download.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -if [ ! -d "train_data" ]; then - mkdir train_data -fi - -if [ ! -d "test_data" ]; then - mkdir test_data -fi - -wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Fev -O ./test_data/ev -wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr -O ./train_data/tr diff --git a/PaddleRec/ctr/xdeepfm/infer.py b/PaddleRec/ctr/xdeepfm/infer.py index dbac3579..1b89a028 100644 --- a/PaddleRec/ctr/xdeepfm/infer.py +++ b/PaddleRec/ctr/xdeepfm/infer.py @@ -26,7 +26,8 @@ def infer(): inference_scope = fluid.Scope() test_files = [ - args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) + os.path.join(args.test_data_dir, x) + for x in os.listdir(args.test_data_dir) ] criteo_dataset = CriteoDataset() test_reader = paddle.batch( @@ -34,7 +35,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/xdeepfm/local_train.py b/PaddleRec/ctr/xdeepfm/local_train.py index d53dc882..32c02f9c 100644 --- a/PaddleRec/ctr/xdeepfm/local_train.py +++ b/PaddleRec/ctr/xdeepfm/local_train.py @@ -4,6 +4,7 @@ import paddle.fluid as fluid import sys import network_conf import time +import utils def train(): @@ -25,7 +26,8 @@ def train(): dataset.set_pipe_command('python criteo_reader.py') dataset.set_batch_size(args.batch_size) dataset.set_filelist([ - args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) + os.path.join(args.train_data_dir, x) + for x in os.listdir(args.train_data_dir) ]) if args.use_gpu == 1: @@ -46,7 +48,8 @@ def train(): fetch_info=['loss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables( -- GitLab