diff --git a/doc/CTR_PREDICTION.md b/doc/CTR_PREDICTION.md old mode 100644 new mode 100755 index 1b82e1919d5eeaf1e318a2204120f4677e8c4a61..101433530727abc4abcc2b17c7f6bc9938a85f5f --- a/doc/CTR_PREDICTION.md +++ b/doc/CTR_PREDICTION.md @@ -1,3 +1,326 @@ -# CTR预估模型 - -原始模型地址: +# CTRԤģ + +## 1. + +Ƽ߹ҵ񳡾УembeddingĹģdzӴ󣬴ﵽGBTѵ˹ģģҪõֲʽѵƬºͱ棻һ棬ѵõģͣҪӦҵҲԵءPaddle ServingṩģϡдûԷؽģϡkvʽйܵԤֻ轫ҪIJӼӲȡִкԤ̡ + +CTRԤģΪʾPaddle Servingʹôģϡ񡣹ģϸο[ԭʼģ](https://github.com/PaddlePaddle/models/tree/v1.5/PaddleRec/ctr) + +[ݼ](https://www.kaggle.com/c/criteo-display-ad-challenge/data)ģԭʼΪ13άinteger features26άcategorical featuresǵģУ13άinteger featureΪdense featurefeedһdata layer26άcategorical featuresΪһfeatureֱfeedһdata layer֮⣬Ϊaucָ꣬labelΪһfeature롣 + +ȱʡѵģ͵embedding dimΪ100wsizeΪ10ҲDzΪ1000000 x 10float;ʵռڴ湲1000000 x 10 x sizeof(float) = 39MB**ʵʳУembeddingҪĶࣻ˸demoΪʾʹ** + + +## 2. ģͲü + +дĵʱ([v1.5](https://github.com/PaddlePaddle/models/tree/v1.5))ѵűPaddlePaddle py_readerȡٶȣprogramдpy_readerOPѵֻģͲûбprogramIJûֱԤأԭʼtensoraucbatch_aucʵģԤʱֻҪÿpredictҪĵģ͵tensorΪpredictУΪʾϡʹãҪ⽫embedding layerlookup_table OPԤprogramõembedding layeroutput variableΪ룬ȻӶӦfeed OPʹܹԤʱϡȡembedding󣬽ֱfeedembeddingoutput variable + +ϼ濼ǣҪԭʼprogramвü¹Ϊ + +1) ȥpy_readerش룬ΪfluidԴreaderDataFeed +2) ޸ԭʼãpredictΪfetch target +3) ޸ԭʼã26ϡembedding layeroutputΪfeed targetϡʹ +4) ޸ĺ磬train 1batch󣬵`fluid.io.save_inference_model()`òüģprogram +5) üprogrampythonٴδȥembedding layerlookup_table OPΪǰPaddle Fluidڵ4`save_inference_model()`ʱûвüɾembeddinglookup_table OPЩOPȥôembeddingoutput variableͻ2OPһfeed OPҪӵģһlookup_tablelookup_tableû룬feed OPาǣ´ +6) 4õprogramֲʽѵģͲembedding֮⣩浽һγԤģ + +1) - 5)üϺģ£ + +![Pruned CTR prediction network](doc/pruned-ctr-network.png) + + +ü̾˵£ + +### 2.1 ȥpy_reader + +Inference programctr_dnn_model()ʱ`user_py_reader=False`ctr_dnn_modelнpy_readerصĴȥ + +޸ǰ +```python +def train(): + args = parse_args() + + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + + loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim) + ... +``` + +޸ĺ +```python +def train(): + args = parse_args() + + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + + loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim, use_py_reader=False) + ... +``` + + +### 2.2 ޸feed targetsfetch targets + +2ڿͷΪʹprogramʺʾϡʹãҪüprogram`ctr_dnn_model`feed variable listfetch variableֱĵ + +1) Inference program26άϡΪÿembedding layeroutput variable +2) fetch targetsзصpredictȡauc_varbatch_auc_var + +дʱԭʼ (network_conf.py)`ctr_dnn_model`£ + +```python +def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): + + def embedding_layer(input): + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 + # if you want to set is_distributed to True + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr(name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + return fluid.layers.sequence_pool(input=emb, pool_type='average') # ޸1 + + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + + sparse_input_ids = [ + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') + for i in range(1, 27)] + + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + words = [dense_input] + sparse_input_ids + [label] + + py_reader = None + if use_py_reader: + py_reader = fluid.layers.create_py_reader_by_data(capacity=64, + feed_list=words, + name='py_reader', + use_double_buffer=True) + words = fluid.layers.read_file(py_reader) + + sparse_embed_seq = list(map(embedding_layer, words[1:-1])) # ޸2 + concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) + + fc1 = fluid.layers.fc(input=concated, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(concated.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + predict = fluid.layers.fc(input=fc3, size=2, act='softmax', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + + cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) + + return avg_cost, auc_var, batch_auc_var, py_reader, words # ޸3 +``` + +޸ĺ + +```python +def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): + def embedding_layer(input): + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 + # if you want to set is_distributed to True + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr(name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + seq = fluid.layers.sequence_pool(input=emb, pool_type='average') + return emb, seq # Ӧ޸Ĵ1 + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') + for i in range(1, 27)] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + words = [dense_input] + sparse_input_ids + [label] + sparse_embed_and_seq = list(map(embedding_layer, words[1:-1])) + + emb_list = [x[0] for x in sparse_embed_and_seq] # Ӧ޸Ĵ2 + sparse_embed_seq = [x[1] for x in sparse_embed_and_seq] + + concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) + + train_feed_vars = words # Ӧ޸Ĵ2 + inference_feed_vars = emb_list + words[0:1] + + fc1 = fluid.layers.fc(input=concated, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(concated.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + predict = fluid.layers.fc(input=fc3, size=2, act='softmax', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) + fetch_vars = [predict] + + # Ӧ޸Ĵ3 + return avg_cost, auc_var, batch_auc_var, train_feed_vars, inference_feed_vars, fetch_vars +``` + +˵ + +1) ޸Ĵ1ǽembedding layer +2) ޸Ĵ2ǽembedding layer浽`emb_list`߽һ浽`inference_feed_vars``save_inference_model()`ʱָfeed variable list +3) ޸Ĵ3ǽ`words`Ϊѵʱfeed variable list (`train_feed_vars`)embedding layeroutput variableΪinferʱfeed variable list (`inference_feed_vars`)`predict`Ϊfetch target (`fetch_vars`)ֱ𷵻ء`inference_feed_vars``fetch_vars``fluid.io.save_inference_model()`ʱָfeed variable listfetch target list + + +### 2.3 fluid.io.save_inference_model()üprogram + +`fluid.io.save_inference_model()`ģͲܹfeed variable listfetch target listprogramвüγʺinferenceõprogramԭǣǰãfetch target listʼOPбÿOPĿvariable listٴεݹطҵOPvariable list + +2.2Ѿõ`inference_feed_vars``fetch_vars`ֻҪѵÿαģͲʱΪ`fluid.io.save_inference_model()` + +޸ǰ + +```python +def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, + trainer_num, trainer_id): + +...ʡ + for pass_id in range(args.num_passes): + pass_start = time.time() + batch_id = 0 + py_reader.start() + + try: + while True: + loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) + loss_val = np.mean(loss_val) + auc_val = np.mean(auc_val) + batch_auc_val = np.mean(batch_auc_val) + + logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" + .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) + if batch_id % 1000 == 0 and batch_id != 0: + model_dir = args.model_output_dir + '/batch-' + str(batch_id) + if args.trainer_id == 0: + fluid.io.save_persistables(executor=exe, dirname=model_dir, + main_program=fluid.default_main_program()) + batch_id += 1 + except fluid.core.EOFException: + py_reader.reset() + print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start)) +...ʡ +``` + +޸ĺ + +```python +def train_loop(args, + train_program, + train_feed_vars, + inference_feed_vars, # üprogramõfeed variable list + fetch_vars, # üprogramõfetch variable list + loss, + auc_var, + batch_auc_var, + trainer_num, + trainer_id): + # ΪѾpy_readerȥfluidԴDataFeeder + dataset = reader.CriteoDataset(args.sparse_feature_dim) + train_reader = paddle.batch( + paddle.reader.shuffle( + dataset.train([args.train_data_path], trainer_num, trainer_id), + buf_size=args.batch_size * 100), + batch_size=args.batch_size) + + inference_feed_var_names = [var.name for var in inference_feed_vars] + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + total_time = 0 + pass_id = 0 + batch_id = 0 + + feed_var_names = [var.name for var in feed_vars] + feeder = fluid.DataFeeder(feed_var_names, place) + + for data in train_reader(): + loss_val, auc_val, batch_auc_val = exe.run(fluid.default_main_program(), + feed = feeder.feed(data), + fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) + fluid.io.save_inference_model(model_dir, + inference_feed_var_names, + fetch_vars, + exe, + fluid.default_main_program()) + break # ֻҪüprogramҪģͲֻtrainһbatchֹͣ + loss_val = np.mean(loss_val) + auc_val = np.mean(auc_val) + batch_auc_val = np.mean(batch_auc_val) + logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" + .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) +``` + +### 2.4 pythonٴδinference programȥlookup_table OP + +һΪ`fluid.io.save_inference_model()`üprogramûнlookup_table OPȥδ`save_inference_model`ӿƣڿ + +Ҫ룺 + +```python +ef prune_program(): + # Ӵ̶inference program + args = parse_args() + model_dir = args.model_output_dir + "/inference_only" + model_file = model_dir + "/__model__" + with open(model_file, "rb") as f: + protostr = f.read() + f.close() + + # лΪprotobuf message + proto = framework_pb2.ProgramDesc.FromString(six.binary_type(protostr)) + + # OPȥlookup_table + block = proto.blocks[0] + kept_ops = [op for op in block.ops if op.type != "lookup_table"] + del block.ops[:] + block.ops.extend(kept_ops) + + # ޸ĺprogram + with open(model_file + ".pruned", "wb") as f: + f.write(proto.SerializePartialToString()) + f.close() +``` +### 2.5 ü̴һ + +ṩIJüCTRԤģ͵Ľűļsave_program.pyͬ[CTRֲʽѵ](doc/DISTRIBUTED_TRAINING_AND_SERVING.md)һ𷢲trainerpserverѵűĿ¼ҵ + +## 3. Ԥ + +Clientˣ +1) Dense feature: datasetÿȡ13integer featuresγ1dense feature +2) Sparse feature: datasetÿȡ26categorical featureֱ𾭹hash(str(feature_index) + feature_string)ǩõÿfeatureidγ26sparse feature + +Serving: +1) Dense feature: dense feature13float֣һfeeddense_inputvariableӦLodTensor +2) Sparse feature: 26sparse feature idֱkvȡӦembeddingfeedӦ26embedding layeroutput variableDzüУЩvariableֱӦıΪembedding_0.tmp_0, embedding_1.tmp_0, ... embedding_25.tmp_0 +3) ִԤ⣬ȡԤ diff --git a/doc/pruned-ctr-network.png b/doc/pruned-ctr-network.png new file mode 100755 index 0000000000000000000000000000000000000000..dd5c31a0a9dacd90c6c0019be1a49921f9900ff5 Binary files /dev/null and b/doc/pruned-ctr-network.png differ