diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py index 8c39950f9329b4f0a2d4fe2512b1833bd796ccf3..66bf13d250e5487696177f6d710cd2ec73944d97 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/generate_test.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import argparse import logging import numpy as np @@ -22,12 +21,12 @@ import os os.environ["CUDA_VISIBLE_DEVICES"] = "" import paddle import paddle.fluid as fluid -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) num_context_feature = 22 + def parse_args(): parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example") parser.add_argument( @@ -59,6 +58,7 @@ def parse_args(): return parser.parse_args() + def to_lodtensor(data, place): seq_lens = [len(seq) for seq in data] cur_len = 0 @@ -72,7 +72,6 @@ def to_lodtensor(data, place): res.set(flattened_data, place) res.set_lod([lod]) - return res @@ -91,7 +90,8 @@ def data2tensor(data, place): sparse_data = to_lodtensor([x[1 + i] for x in data], place) feed_dict["context" + str(i)] = sparse_data - context_fm = to_lodtensor(np.array([x[-2] for x in data]).astype("float32"), place) + context_fm = to_lodtensor( + np.array([x[-2] for x in data]).astype("float32"), place) feed_dict["context_fm"] = context_fm y_data = np.array([x[-1] for x in data]).astype("int64") @@ -99,6 +99,7 @@ def data2tensor(data, place): feed_dict["label"] = y_data return feed_dict + def test(): args = parse_args() @@ -112,13 +113,14 @@ def test(): exe = fluid.Executor(place) whole_filelist = ["./out/normed_test_session.txt"] - test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] - + test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len( + whole_filelist))] epochs = 1 for i in range(epochs): - cur_model_path = args.model_path + "/epoch" + str(1) + ".model" + cur_model_path = os.path.join(args.model_path, + "epoch" + str(1) + ".model") with open("./testres/res" + str(i), 'w') as r: with fluid.scope_guard(test_scope): [inference_program, feed_target_names, fetch_targets] = \ @@ -129,9 +131,11 @@ def test(): for batch_id, data in enumerate(test_reader()): print(len(data[0])) feed_dict = data2tensor(data, place) - loss_val, auc_val, accuracy, predict, _ = exe.run(inference_program, - feed=feed_dict, - fetch_list=fetch_targets, return_numpy=False) + loss_val, auc_val, accuracy, predict, _ = exe.run( + inference_program, + feed=feed_dict, + fetch_list=fetch_targets, + return_numpy=False) x = np.array(predict) for j in range(x.shape[0]): diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py index 5c58ecf1380bd339601a7692bedce11a587dd940..c218ce0fccc94ee595fd1681b54258d8d6ce43c0 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/infer.py @@ -12,8 +12,7 @@ import paddle.fluid as fluid import map_reader from network_conf import ctr_deepfm_dataset -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) @@ -91,15 +90,20 @@ def infer(): place = fluid.CPUPlace() inference_scope = fluid.core.Scope() - filelist = ["%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path)] + filelist = [ + "%s/%s" % (args.data_path, x) for x in os.listdir(args.data_path) + ] from map_reader import MapDataset map_dataset = MapDataset() map_dataset.setup(args.sparse_feature_dim) exe = fluid.Executor(place) - whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] + whole_filelist = [ + "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data"))) + ] #whole_filelist = ["./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] - test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len(whole_filelist))] + test_files = whole_filelist[int(0.0 * len(whole_filelist)):int(1.0 * len( + whole_filelist))] # file_groups = [whole_filelist[i:i+train_thread_num] for i in range(0, len(whole_filelist), train_thread_num)] @@ -110,7 +114,8 @@ def infer(): epochs = 2 for i in range(epochs): - cur_model_path = args.model_path + "/epoch" + str(i + 1) + ".model" + cur_model_path = os.path.join(args.model_path, + "epoch" + str(i + 1) + ".model") with fluid.scope_guard(inference_scope): [inference_program, feed_target_names, fetch_targets] = \ fluid.io.load_inference_model(cur_model_path, exe) @@ -120,9 +125,11 @@ def infer(): test_reader = map_dataset.infer_reader(test_files, 1000, 100000) for batch_id, data in enumerate(test_reader()): - loss_val, auc_val, accuracy, predict, label = exe.run(inference_program, - feed=data2tensor(data, place), - fetch_list=fetch_targets, return_numpy=False) + loss_val, auc_val, accuracy, predict, label = exe.run( + inference_program, + feed=data2tensor(data, place), + fetch_list=fetch_targets, + return_numpy=False) #print(np.array(predict)) #x = np.array(predict) diff --git a/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py b/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py index 62241f2b11fd2cdf642b6b2e778e3ecb7681d41a..9d7e9452a14e08d293d77bc41fc2806ca9c0d1a2 100644 --- a/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py +++ b/PaddleRec/ctr/Paddle_baseline_KDD2019/local_train.py @@ -6,33 +6,36 @@ import paddle.fluid as fluid import sys from network_confv6 import ctr_deepfm_dataset - NUM_CONTEXT_FEATURE = 22 DIM_USER_PROFILE = 10 DIM_DENSE_FEATURE = 3 -PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours +PYTHON_PATH = "/home/yaoxuefeng/whls/paddle_release_home/python/bin/python" # this is mine change yours + def train(): args = parse_args() if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) - + #set the input format for our model. Note that you need to carefully modify them when you define a new network #user_profile = fluid.layers.data( - #name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1) + #name="user_profile", shape=[DIM_USER_PROFILE], dtype='int64', lod_level=1) dense_feature = fluid.layers.data( name="dense_feature", shape=[DIM_DENSE_FEATURE], dtype='float32') context_feature = [ - fluid.layers.data(name="context" + str(i), shape=[1], lod_level=1, dtype="int64") - for i in range(0, NUM_CONTEXT_FEATURE)] + fluid.layers.data( + name="context" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(0, NUM_CONTEXT_FEATURE) + ] context_feature_fm = fluid.layers.data( name="context_fm", shape=[1], dtype='int64', lod_level=1) label = fluid.layers.data(name='label', shape=[1], dtype='int64') print("ready to network") #self define network - loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset(dense_feature, context_feature, context_feature_fm, label, - args.embedding_size, args.sparse_feature_dim) + loss, auc_var, batch_auc_var, accuracy, predict = ctr_deepfm_dataset( + dense_feature, context_feature, context_feature_fm, label, + args.embedding_size, args.sparse_feature_dim) print("ready to optimize") optimizer = fluid.optimizer.SGD(learning_rate=1e-4) @@ -42,7 +45,8 @@ def train(): exe.run(fluid.default_startup_program()) #use dataset api for much faster speed dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([dense_feature] + context_feature + [context_feature_fm] + [label]) + dataset.set_use_var([dense_feature] + context_feature + + [context_feature_fm] + [label]) #self define how to process generated training insatnces in map_reader.py pipe_command = PYTHON_PATH + " map_reader.py %d" % args.sparse_feature_dim dataset.set_pipe_command(pipe_command) @@ -50,27 +54,36 @@ def train(): thread_num = 1 dataset.set_thread(thread_num) #self define how to split training files for example:"split -a 2 -d -l 200000 normed_train.txt normed_train" - whole_filelist = ["./out/normed_train%d" % x for x in range(len(os.listdir("out")))] - whole_filelist = ["./out/normed_train00", "./out/normed_train01", "./out/normed_train02", "./out/normed_train03", - "./out/normed_train04", "./out/normed_train05", "./out/normed_train06", "./out/normed_train07", - "./out/normed_train08", - "./out/normed_train09", "./out/normed_train10", "./out/normed_train11"] + whole_filelist = [ + "./out/normed_train%d" % x for x in range(len(os.listdir("out"))) + ] + whole_filelist = [ + "./out/normed_train00", "./out/normed_train01", "./out/normed_train02", + "./out/normed_train03", "./out/normed_train04", "./out/normed_train05", + "./out/normed_train06", "./out/normed_train07", "./out/normed_train08", + "./out/normed_train09", "./out/normed_train10", "./out/normed_train11" + ] print("ready to epochs") epochs = 10 for i in range(epochs): print("start %dth epoch" % i) dataset.set_filelist(whole_filelist[:int(len(whole_filelist))]) #print the informations you want by setting fetch_list and fetch_info - exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=[auc_var, accuracy, predict, label], - fetch_info=["auc", "accuracy", "predict", "label"], - debug=False) - model_dir = args.model_output_dir + '/epoch' + str(i + 1) + ".model" + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=[auc_var, accuracy, predict, label], + fetch_info=["auc", "accuracy", "predict", "label"], + debug=False) + model_dir = os.path.join(args.model_output_dir, + '/epoch' + str(i + 1) + ".model") sys.stderr.write("epoch%d finished" % (i + 1)) #save model - fluid.io.save_inference_model(model_dir, [dense_feature.name] + [x.name for x in context_feature] + [context_feature_fm.name] + [label.name], - [loss, auc_var, accuracy, predict, label], exe) + fluid.io.save_inference_model( + model_dir, + [dense_feature.name] + [x.name for x in context_feature] + + [context_feature_fm.name] + [label.name], + [loss, auc_var, accuracy, predict, label], exe) if __name__ == '__main__': diff --git a/PaddleRec/ctr/dcn/README.md b/PaddleRec/ctr/dcn/README.md index 4c59c39a9a1b17d163597ce6727742ac2b751c2a..1ac51ddc132932ef691226874fbdbdf9a5d01c8e 100644 --- a/PaddleRec/ctr/dcn/README.md +++ b/PaddleRec/ctr/dcn/README.md @@ -32,7 +32,7 @@ DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions] 数据下载命令 ```bash -cd data && sh download.sh +cd data && python download.py ``` ## 数据处理 @@ -70,13 +70,14 @@ loss: [0.44703564] auc_val: [0.80654419] ## 多机训练 首先使用命令下载并预处理小规模样例数据集: ```bash -cd dist_data && sh dist_download.sh && cd .. +cd dist_data && python dist_download.py && cd .. ``` 运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。 **注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。** ```bash +# 该sh不支持Windows sh cluster_train.sh ``` 参数说明: @@ -102,7 +103,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_da - 0号trainer保存模型参数 - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - + >ps -ef | grep python - + - 数据读取使用dataset模式,目前仅支持运行在Linux环境下 diff --git a/PaddleRec/ctr/dcn/cluster_train.py b/PaddleRec/ctr/dcn/cluster_train.py index aa862ea3a57c7e31f0122e195ad732f4d5fef302..601d3a9423f52ff710d42f898ca28ec546b072fc 100644 --- a/PaddleRec/ctr/dcn/cluster_train.py +++ b/PaddleRec/ctr/dcn/cluster_train.py @@ -162,7 +162,8 @@ def train(): fetch_info=['total_loss', 'avg_logloss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/dcn/config.py b/PaddleRec/ctr/dcn/config.py index e17b9dc5037ef00b4419988cf22ff70500f5844a..dafb009e2228f144af2e874fb057c4c4cb7ed913 100644 --- a/PaddleRec/ctr/dcn/config.py +++ b/PaddleRec/ctr/dcn/config.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 import argparse """ global params diff --git a/PaddleRec/ctr/dcn/data/download.py b/PaddleRec/ctr/dcn/data/download.py new file mode 100644 index 0000000000000000000000000000000000000000..b2fedfe83625970d0e47b9db0a373a99b457fe61 --- /dev/null +++ b/PaddleRec/ctr/dcn/data/download.py @@ -0,0 +1,24 @@ +import os +import sys +import io + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + trainfile = 'train.txt' + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url) + print("download and extract finished") + + count = 0 + for _ in io.open(trainfile, 'r', encoding='utf-8'): + count += 1 + + print("total records: %d" % count) + print("done") diff --git a/PaddleRec/ctr/dcn/data/download.sh b/PaddleRec/ctr/dcn/data/download.sh deleted file mode 100755 index 9d96285bf4c303f50a88e4c0767a757e6cfa1727..0000000000000000000000000000000000000000 --- a/PaddleRec/ctr/dcn/data/download.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -workdir=$(cd $(dirname $0); pwd) - -cd $workdir - -trainfile='train.txt' - -echo "data dir:" ${workdir} - -cd $workdir - -echo "download data starting..." -wget --no-check-certificate -c https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz -echo "download finished" - -echo "extracting ..." -tar xzf dac.tar.gz >/dev/null 2>&1 -wc -l $trainfile | awk '{print $1}' > line_nums.log - -echo "extract finished" -echo "total records: "`cat line_nums.log` -echo "done" diff --git a/PaddleRec/ctr/dcn/data/preprocess.py b/PaddleRec/ctr/dcn/data/preprocess.py index 6ee5044bac370979b4416528bbcd35411ea37a4e..dd23c7ddb42a99123cdaa450199d210925ca206f 100644 --- a/PaddleRec/ctr/dcn/data/preprocess.py +++ b/PaddleRec/ctr/dcn/data/preprocess.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os @@ -19,7 +17,6 @@ VOCAB_DIR = 'vocab' TRAIN_DIR = 'train' TEST_VALID_DIR = 'test_valid' SPLIT_RATIO = 0.9 -LINE_NUMS = "line_nums.log" FREQ_THR = 10 INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] @@ -113,11 +110,13 @@ def split_data(): fout.close() data_dir = TEST_VALID_DIR cur_part_idx = int(line_idx / 200000) - fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') + fout = open( + os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w') if line_idx % 200000 == 0 and line_idx != 0: fout.close() cur_part_idx = int(line_idx / 200000) - fout = open(data_dir + '/part-' + str(cur_part_idx), 'w') + fout = open( + os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w') fout.write(line) fout.close() fin.close() diff --git a/PaddleRec/ctr/dcn/dist_data/dist_download.py b/PaddleRec/ctr/dcn/dist_data/dist_download.py new file mode 100644 index 0000000000000000000000000000000000000000..662982f6d6738ad90accd6b03dca7a21eb9fb3ae --- /dev/null +++ b/PaddleRec/ctr/dcn/dist_data/dist_download.py @@ -0,0 +1,19 @@ +from __future__ import print_function +import os +import sys +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url, savename="dist_data_demo.tar.gz") + print("download and extract finished") + + print("preprocessing...") + os.system("python dist_preprocess.py") + print("preprocess done") \ No newline at end of file diff --git a/PaddleRec/ctr/dcn/dist_data/dist_download.sh b/PaddleRec/ctr/dcn/dist_data/dist_download.sh deleted file mode 100755 index 78b3841ae9076942d317b9da0bcb3f0cd65a6be9..0000000000000000000000000000000000000000 --- a/PaddleRec/ctr/dcn/dist_data/dist_download.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -# download small demo dataset -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz -tar xzvf dist_data_demo.tar.gz -# preprocess demo dataset -python dist_preprocess.py diff --git a/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py b/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py index 6a4c801c0233384decfa8035d8d967257088d775..afad881b48270ac00443ad5ce6273fa3d8216862 100644 --- a/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py +++ b/PaddleRec/ctr/dcn/dist_data/dist_preprocess.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os @@ -21,7 +19,6 @@ TEST_DIR = 'dist_test_valid_data' TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr') TEST_FILE = os.path.join(TEST_DIR, 'ev') SPLIT_RATIO = 0.9 -LINE_NUMS = "line_nums.log" FREQ_THR = 10 INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)] diff --git a/PaddleRec/ctr/dcn/infer.py b/PaddleRec/ctr/dcn/infer.py index 25e1337db6d67cdbbae750f5e57623b145f8ab97..47e628e089e16f0286367adfb2a13aedd29156cf 100644 --- a/PaddleRec/ctr/dcn/infer.py +++ b/PaddleRec/ctr/dcn/infer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 import logging import random @@ -46,7 +44,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/dcn/local_train.py b/PaddleRec/ctr/dcn/local_train.py index bb8c42405281768f910dd618e1d235ec80c9cb93..29badfe46e2f8c70c38bd7b5db11a3986f25ba04 100644 --- a/PaddleRec/ctr/dcn/local_train.py +++ b/PaddleRec/ctr/dcn/local_train.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import os import random @@ -75,7 +73,8 @@ def train(args): fetch_info=['total_loss', 'avg_logloss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables( diff --git a/PaddleRec/ctr/dcn/network.py b/PaddleRec/ctr/dcn/network.py index f0637227b86efd02b4f5e348bd129723b8ab8936..bb29b26b3699710a5f66a5420801ea7370263723 100644 --- a/PaddleRec/ctr/dcn/network.py +++ b/PaddleRec/ctr/dcn/network.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 from __future__ import print_function, absolute_import, division import paddle.fluid as fluid from collections import OrderedDict diff --git a/PaddleRec/ctr/dcn/reader.py b/PaddleRec/ctr/dcn/reader.py index aa915f6edbaf2cda51e764645f719fa965f3bc5d..291fc988edb3683aec5ef529ec78fbd87897fc72 100644 --- a/PaddleRec/ctr/dcn/reader.py +++ b/PaddleRec/ctr/dcn/reader.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -# coding: utf-8 """ dataset and reader """ diff --git a/PaddleRec/ctr/deepfm/README.md b/PaddleRec/ctr/deepfm/README.md index 9ff01aa190114cc6e2b8cc95c9e2bcf01e82a38d..ace75e5405e001acc8ba67285a9f6c79726bb01f 100644 --- a/PaddleRec/ctr/deepfm/README.md +++ b/PaddleRec/ctr/deepfm/README.md @@ -25,7 +25,7 @@ To preprocess the raw dataset, we min-max normalize continuous features to [0, 1 Download and preprocess data: ```bash -cd data && sh download_preprocess.sh && cd .. +cd data && python download_preprocess.py && cd .. ``` After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2". @@ -58,12 +58,13 @@ We emulate distributed training on a local machine. In default, we use 2 X 2,i ### Download and preprocess distributed demo dataset This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train. ```bash -cd dist_data && sh dist_data_download.sh && cd .. +cd dist_data && python dist_data_download.py && cd .. ``` ### Distributed Train and Infer Train ```bash +# 该sh不支持Windows sh cluster_train.sh ``` params of cluster_train.sh: @@ -89,7 +90,7 @@ Notes: - The first trainer(with trainer_id 0) saves model params. - After each training, pserver processes should be stop manually. You can use command below: - + >ps -ef | grep python - We use Dataset API to load data,it's only supported on Linux now. diff --git a/PaddleRec/ctr/deepfm/cluster_train.py b/PaddleRec/ctr/deepfm/cluster_train.py index 5f03fee9154efbfcf0ba38640951a569c7aa8548..5cac24191d8666ab15a3cb50af29b3af84267ff3 100644 --- a/PaddleRec/ctr/deepfm/cluster_train.py +++ b/PaddleRec/ctr/deepfm/cluster_train.py @@ -133,7 +133,7 @@ def train(): dataset.set_batch_size(args.batch_size) dataset.set_thread(args.num_thread) train_filelist = [ - args.train_data_dir + '/' + x + os.path.join(args.train_data_dir, x) for x in os.listdir(args.train_data_dir) ] @@ -156,7 +156,8 @@ def train(): fetch_info=['epoch %d batch loss' % (epoch_id + 1)], print_period=5, debug=False) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/deepfm/data/download_preprocess.py b/PaddleRec/ctr/deepfm/data/download_preprocess.py new file mode 100644 index 0000000000000000000000000000000000000000..054610236c7516d1fe521c70145a64af95b25def --- /dev/null +++ b/PaddleRec/ctr/deepfm/data/download_preprocess.py @@ -0,0 +1,25 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress, download_file + +if __name__ == '__main__': + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2" + + print("download and extract starting...") + download_file_and_uncompress(url) + download_file(url2, "./aid_data/feat_dict_10.pkl2", True) + print("download and extract finished") + + print("preprocessing...") + os.system("python preprocess.py") + print("preprocess done") + + shutil.rmtree("raw_data") + print("done") diff --git a/PaddleRec/ctr/deepfm/data/download_preprocess.sh b/PaddleRec/ctr/deepfm/data/download_preprocess.sh deleted file mode 100644 index eed4ffe2d12d24f2d1b403306aeb7bf74aa3f1d5..0000000000000000000000000000000000000000 --- a/PaddleRec/ctr/deepfm/data/download_preprocess.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2 -O ./aid_data/feat_dict_10.pkl2 || rm -f ./aid_data/feat_dict_10.pkl2 -tar zxf dac.tar.gz >/dev/null 2>&1 -rm -f dac.tar.gz - -python preprocess.py -rm *.txt -rm -r raw_data diff --git a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py new file mode 100644 index 0000000000000000000000000000000000000000..63e2756db389f65c17280b85437cab69e159dbda --- /dev/null +++ b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.py @@ -0,0 +1,22 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url, savename="dist_data_demo.tar.gz") + print("download and extract finished") + + print("preprocessing...") + os.system("python preprocess_dist.py") + print("preprocess done") + + print("done") \ No newline at end of file diff --git a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh b/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh deleted file mode 100644 index 6db0dbdc92ad55cfa2fe8c653ede88b4b9e77eba..0000000000000000000000000000000000000000 --- a/PaddleRec/ctr/deepfm/dist_data/dist_data_download.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash - -# download small demo dataset -wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz -O dist_data_demo.tar.gz -tar xzvf dist_data_demo.tar.gz -# preprocess dataset -python preprocess_dist.py diff --git a/PaddleRec/ctr/deepfm/infer.py b/PaddleRec/ctr/deepfm/infer.py index 527d389cb5bf2aa19f712e88ba94fa77b1f488b3..cb798a6a6ca5cb154554c56c799f9363c3981563 100644 --- a/PaddleRec/ctr/deepfm/infer.py +++ b/PaddleRec/ctr/deepfm/infer.py @@ -25,7 +25,8 @@ def infer(): inference_scope = fluid.Scope() test_files = [ - args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) + os.path.join(args.test_data_dir, x) + for x in os.listdir(args.test_data_dir) ] criteo_dataset = CriteoDataset() criteo_dataset.setup(args.feat_dict) @@ -34,7 +35,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/deepfm/local_train.py b/PaddleRec/ctr/deepfm/local_train.py index d81ad518c65a27d454d3dde826b4306706bfb7b5..dbf62d9617e2eb228c4d6351da93a2b7d7bf2313 100644 --- a/PaddleRec/ctr/deepfm/local_train.py +++ b/PaddleRec/ctr/deepfm/local_train.py @@ -36,7 +36,8 @@ def train(): dataset.set_batch_size(args.batch_size) dataset.set_thread(args.num_thread) train_filelist = [ - args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) + os.path.join(args.train_data_dir, x) + for x in os.listdir(args.train_data_dir) ] print('---------------------------------------------') @@ -50,7 +51,8 @@ def train(): fetch_info=['epoch %d batch loss' % (epoch_id + 1)], print_period=1000, debug=False) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables( diff --git a/PaddleRec/ctr/dnn/README.cn.md b/PaddleRec/ctr/dnn/README.cn.md index 54a7d55b8d9ae5183645fd8c0ff87afd5c22258b..e5b6ef400eaf20c04aed4deb687497f9c5c68784 100644 --- a/PaddleRec/ctr/dnn/README.cn.md +++ b/PaddleRec/ctr/dnn/README.cn.md @@ -29,7 +29,7 @@ pip install -r requirements.txt 下载数据集: ```bash -cd data && ./download.sh && cd .. +cd data && python download.py && cd .. ``` ## 模型 @@ -56,6 +56,7 @@ python train.py \ 本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率 ```bash +# 该sh不支持Windows sh cluster_train.sh ``` diff --git a/PaddleRec/ctr/dnn/README.md b/PaddleRec/ctr/dnn/README.md index 9587a2a8d9f76f90a9b9e58a9ce5bbe1c7baa138..7c4678f2596a132f23e6d29e315289883b471328 100644 --- a/PaddleRec/ctr/dnn/README.md +++ b/PaddleRec/ctr/dnn/README.md @@ -39,7 +39,7 @@ categorical features. For the test dataset, the labels are omitted. Download dataset: ```bash -cd data && ./download.sh && cd .. +cd data && python download.py && cd .. ``` ## Model @@ -73,6 +73,7 @@ In distributed training setting, training data is splited by trainer_id, so that do not overlap among trainers ```bash +# this shell not support Windows sh cluster_train.sh ``` diff --git a/PaddleRec/ctr/dnn/data/download.py b/PaddleRec/ctr/dnn/data/download.py new file mode 100644 index 0000000000000000000000000000000000000000..b88191ffa85cd9ce36d4b5d20e8ec30c8434e5c9 --- /dev/null +++ b/PaddleRec/ctr/dnn/data/download.py @@ -0,0 +1,28 @@ +import os +import shutil +import sys +import glob + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress + +if __name__ == '__main__': + url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz" + + print("download and extract starting...") + download_file_and_uncompress(url) + print("download and extract finished") + + if os.path.exists("raw"): + shutil.rmtree("raw") + os.mkdir("raw") + + # mv ./*.txt raw/ + files = glob.glob("*.txt") + for f in files: + shutil.move(f, "raw") + + print("done") diff --git a/PaddleRec/ctr/dnn/train.py b/PaddleRec/ctr/dnn/train.py index f63edeeb35416611ce59e4361a2cac5d70a65eea..dfb056d3eacfadcb1fa1ff7b240a776dde574cef 100644 --- a/PaddleRec/ctr/dnn/train.py +++ b/PaddleRec/ctr/dnn/train.py @@ -173,8 +173,8 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, .format(pass_id, batch_id, loss_val / args.batch_size, auc_val, batch_auc_val)) if batch_id % 1000 == 0 and batch_id != 0: - model_dir = args.model_output_dir + '/batch-' + str( - batch_id) + model_dir = os.path.join(args.model_output_dir, + 'batch-' + str(batch_id)) if args.trainer_id == 0: fluid.io.save_persistables( executor=exe, @@ -188,7 +188,7 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, total_time += time.time() - pass_start - model_dir = args.model_output_dir + '/pass-' + str(pass_id) + model_dir = os.path.join(args.model_output_dir, 'pass-' + str(pass_id)) if args.trainer_id == 0: fluid.io.save_persistables( executor=exe, diff --git a/PaddleRec/ctr/tools/tools.py b/PaddleRec/ctr/tools/tools.py new file mode 100644 index 0000000000000000000000000000000000000000..da34a027c027d6809603869f946499ec45edf8e6 --- /dev/null +++ b/PaddleRec/ctr/tools/tools.py @@ -0,0 +1,133 @@ +import os +import time +import shutil +import requests +import sys +import tarfile +import zipfile +import platform +import functools + +lasttime = time.time() +FLUSH_INTERVAL = 0.1 + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) + + +def get_platform(): + return platform.platform() + + +def is_windows(): + return get_platform().lower().startswith("windows") + + +def progress(str, end=False): + global lasttime + if end: + str += "\n" + lasttime = 0 + if time.time() - lasttime >= FLUSH_INTERVAL: + sys.stdout.write("\r%s" % str) + lasttime = time.time() + sys.stdout.flush() + + +def download_file(url, savepath, print_progress): + r = requests.get(url, stream=True) + total_length = r.headers.get('content-length') + + if total_length is None: + with open(savepath, 'wb') as f: + shutil.copyfileobj(r.raw, f) + else: + with open(savepath, 'wb') as f: + dl = 0 + total_length = int(total_length) + starttime = time.time() + if print_progress: + print("Downloading %s" % os.path.basename(savepath)) + for data in r.iter_content(chunk_size=4096): + dl += len(data) + f.write(data) + if print_progress: + done = int(50 * dl / total_length) + progress("[%-50s] %.2f%%" % + ('=' * done, float(100 * dl) / total_length)) + if print_progress: + progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True) + + +def _uncompress_file(filepath, extrapath, delete_file, print_progress): + if print_progress: + print("Uncompress %s" % os.path.basename(filepath)) + + if filepath.endswith("zip"): + handler = _uncompress_file_zip + elif filepath.endswith("tgz"): + handler = _uncompress_file_tar + else: + handler = functools.partial(_uncompress_file_tar, mode="r") + + for total_num, index, rootpath in handler(filepath, extrapath): + if print_progress: + done = int(50 * float(index) / total_num) + progress("[%-50s] %.2f%%" % + ('=' * done, float(100 * index) / total_num)) + if print_progress: + progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True) + + if delete_file: + os.remove(filepath) + + return rootpath + + +def _uncompress_file_zip(filepath, extrapath): + files = zipfile.ZipFile(filepath, 'r') + filelist = files.namelist() + rootpath = filelist[0] + total_num = len(filelist) + for index, file in enumerate(filelist): + files.extract(file, extrapath) + yield total_num, index, rootpath + files.close() + yield total_num, index, rootpath + + +def _uncompress_file_tar(filepath, extrapath, mode="r:gz"): + files = tarfile.open(filepath, mode) + filelist = files.getnames() + total_num = len(filelist) + rootpath = filelist[0] + for index, file in enumerate(filelist): + files.extract(file, extrapath) + yield total_num, index, rootpath + files.close() + yield total_num, index, rootpath + + +def download_file_and_uncompress(url, + savepath=None, + savename=None, + extrapath=None, + print_progress=True, + cover=False, + delete_file=False): + if savepath is None: + savepath = "." + + if extrapath is None: + extrapath = "." + + if savename is None: + savename = url.split("/")[-1] + savepath = os.path.join(savepath, savename) + + if cover: + if os.path.exists(savepath): + shutil.rmtree(savepath) + + if not os.path.exists(savepath): + download_file(url, savepath, print_progress) + _ = _uncompress_file(savepath, extrapath, delete_file, print_progress) diff --git a/PaddleRec/ctr/xdeepfm/README.md b/PaddleRec/ctr/xdeepfm/README.md index cf759ec0190c6136aa41009864361ddd2c23ed49..57341fc10e62ae1b5be80f21546a6d31d8610cde 100644 --- a/PaddleRec/ctr/xdeepfm/README.md +++ b/PaddleRec/ctr/xdeepfm/README.md @@ -8,7 +8,7 @@ demo数据集,在data目录下执行命令,下载数据 ```bash -sh download.sh +python download.py ``` ## 环境 @@ -39,6 +39,7 @@ test_epoch设置加载第10轮训练的模型。 数据下载同上面命令。 ```bash +# 该sh不支持Windows sh cluster_train.sh ``` 参数说明: @@ -64,7 +65,7 @@ python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0 - 0号trainer保存模型参数 - 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程: - + >ps -ef | grep python - 数据读取使用dataset模式,目前仅支持运行在Linux环境下 diff --git a/PaddleRec/ctr/xdeepfm/cluster_train.py b/PaddleRec/ctr/xdeepfm/cluster_train.py index 0c2b4ea7e9cb34383095bdf0feab37481d5ca2e3..727e921cf6cb9db097a1ae2fb5e6947e93281eb6 100644 --- a/PaddleRec/ctr/xdeepfm/cluster_train.py +++ b/PaddleRec/ctr/xdeepfm/cluster_train.py @@ -139,7 +139,7 @@ def train(): dataset.set_pipe_command('python criteo_reader.py') dataset.set_batch_size(args.batch_size) dataset.set_filelist([ - args.train_data_dir + '/' + x + os.path.join(args.train_data_dir, x) for x in os.listdir(args.train_data_dir) ]) @@ -161,7 +161,8 @@ def train(): fetch_info=['loss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) if args.trainer_id == 0: # only trainer 0 save model diff --git a/PaddleRec/ctr/xdeepfm/data/download.py b/PaddleRec/ctr/xdeepfm/data/download.py new file mode 100644 index 0000000000000000000000000000000000000000..4b21696704d28f97cc152e1d4d9ffa96c61c6854 --- /dev/null +++ b/PaddleRec/ctr/xdeepfm/data/download.py @@ -0,0 +1,28 @@ +import os +import shutil +import sys + +LOCAL_PATH = os.path.dirname(os.path.abspath(__file__)) +TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools") +sys.path.append(TOOLS_PATH) + +from tools import download_file_and_uncompress, download_file + +if __name__ == '__main__': + url_train = "https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr" + url_test = "https://paddlerec.bj.bcebos.com/xdeepfm%2Fev" + + train_dir = "train_data" + test_dir = "test_data" + + if not os.path.exists(train_dir): + os.mkdir(train_dir) + if not os.path.exists(test_dir): + os.mkdir(test_dir) + + print("download and extract starting...") + download_file(url_train, "./train_data/tr", True) + download_file(url_test, "./test_data/ev", True) + print("download and extract finished") + + print("done") diff --git a/PaddleRec/ctr/xdeepfm/data/download.sh b/PaddleRec/ctr/xdeepfm/data/download.sh deleted file mode 100644 index 95438938b6fd701c55b316faf9716fc4d11de8a7..0000000000000000000000000000000000000000 --- a/PaddleRec/ctr/xdeepfm/data/download.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -if [ ! -d "train_data" ]; then - mkdir train_data -fi - -if [ ! -d "test_data" ]; then - mkdir test_data -fi - -wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Fev -O ./test_data/ev -wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr -O ./train_data/tr diff --git a/PaddleRec/ctr/xdeepfm/infer.py b/PaddleRec/ctr/xdeepfm/infer.py index dbac3579188d165be002827541cf16112e2b66d2..1b89a028042c6ec39fb62b1427ecbd2a105b3639 100644 --- a/PaddleRec/ctr/xdeepfm/infer.py +++ b/PaddleRec/ctr/xdeepfm/infer.py @@ -26,7 +26,8 @@ def infer(): inference_scope = fluid.Scope() test_files = [ - args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir) + os.path.join(args.test_data_dir, x) + for x in os.listdir(args.test_data_dir) ] criteo_dataset = CriteoDataset() test_reader = paddle.batch( @@ -34,7 +35,8 @@ def infer(): startup_program = fluid.framework.Program() test_program = fluid.framework.Program() - cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch + cur_model_path = os.path.join(args.model_output_dir, + 'epoch_' + args.test_epoch) with fluid.scope_guard(inference_scope): with fluid.framework.program_guard(test_program, startup_program): diff --git a/PaddleRec/ctr/xdeepfm/local_train.py b/PaddleRec/ctr/xdeepfm/local_train.py index d53dc882c65b8e508dff68512c01c3ee5bfdc5d6..32c02f9ccacb12f96ff87131057069dd791dd808 100644 --- a/PaddleRec/ctr/xdeepfm/local_train.py +++ b/PaddleRec/ctr/xdeepfm/local_train.py @@ -4,6 +4,7 @@ import paddle.fluid as fluid import sys import network_conf import time +import utils def train(): @@ -25,7 +26,8 @@ def train(): dataset.set_pipe_command('python criteo_reader.py') dataset.set_batch_size(args.batch_size) dataset.set_filelist([ - args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir) + os.path.join(args.train_data_dir, x) + for x in os.listdir(args.train_data_dir) ]) if args.use_gpu == 1: @@ -46,7 +48,8 @@ def train(): fetch_info=['loss', 'auc'], debug=False, print_period=args.print_steps) - model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1) + model_dir = os.path.join(args.model_output_dir, + 'epoch_' + str(epoch_id + 1)) sys.stderr.write('epoch%d is finished and takes %f s\n' % ( (epoch_id + 1), time.time() - start)) fluid.io.save_persistables(