# CTR预估模型 ## 1. 背景 在搜索、推荐、在线广告等业务场景中,embedding参数的规模常常非常庞大,达到数百GB甚至T级别;训练如此规模的模型需要用到多机分布式训练能力,将参数分片更新和保存;另一方面,训练好的模型,要应用于在线业务,也难以单机加载。Paddle Serving提供大规模稀疏参数读写服务,用户可以方便地将超大规模的稀疏参数以kv形式托管到参数服务,在线预测只需将所需要的参数子集从参数服务读取回来,再执行后续的预测流程。 我们以CTR预估模型为例,演示Paddle Serving中如何使用大规模稀疏参数服务。关于模型细节请参考[原始模型](https://github.com/PaddlePaddle/models/tree/v1.5/PaddleRec/ctr) 根据[对数据集的描述](https://www.kaggle.com/c/criteo-display-ad-challenge/data),该模型原始输入为13维integer features和26维categorical features。在我们的模型中,13维integer feature作为dense feature整体feed到一个data layer,而26维categorical features各自作为一个feature分别feed到一个data layer。除此之外,为计算auc指标,还将label作为一个feature输入。 若按缺省训练参数,本模型的embedding dim为100w,size为10,也就是参数矩阵为1000000 x 10的float型矩阵,实际占用内存共1000000 x 10 x sizeof(float) = 39MB;**实际场景中,embedding参数要大的多;因此该demo仅为演示使用**。 ## 2. 模型裁剪 在写本文档时([v1.5](https://github.com/PaddlePaddle/models/tree/v1.5)),训练脚本用PaddlePaddle py_reader加速样例读取速度,program中带有py_reader相关OP,且训练过程中只保存了模型参数,没有保存program,保存的参数没法直接用预测库加载;另外原始网络中最终输出的tensor是auc和batch_auc,而实际模型用于预测时只需要每个样例的predict,需要改掉模型的输出tensor为predict。再有,为了演示稀疏参数服务的使用,我们要有意将embedding layer包含的lookup_table OP从预测program中拿掉,以embedding layer的output variable作为网络的输入,然后再添加对应的feed OP,使得我们能够在预测时从稀疏参数服务获取到embedding向量后,将数据直接feed到各个embedding的output variable。 基于以上几方面考虑,我们需要对原始program进行裁剪。大致过程为: 1) 去掉py_reader相关代码,改为用fluid自带的reader和DataFeed 2) 修改原始网络配置,将predict变量作为fetch target 3) 修改原始网络配置,将26个稀疏参数的embedding layer的output作为feed target,以与后续稀疏参数服务配合使用 4) 修改后的网络,本地train 1个batch后,调用`fluid.io.save_inference_model()`,获得裁剪后的模型program 5) 裁剪后的program,用python再次处理,去掉embedding layer的lookup_table OP。这是因为,当前Paddle Fluid在第4步`save_inference_model()`时没有裁剪干净,还保留了embedding的lookup_table OP;如果这些OP不去除掉,那么embedding的output variable就会有2个输入OP:一个是feed OP(我们要添加的),一个是lookup_table;而lookup_table又没有输入,它的输出会与feed OP的输出互相覆盖,导致错乱 6) 第4步拿到的program,与分布式训练保存的模型参数(除embedding之外)保存到一起,形成完整的预测模型 第1) - 第5)步裁剪完毕后的模型网络配置如下: ![Pruned CTR prediction network](doc/pruned-ctr-network.png) 整个裁剪过程具体说明如下: ### 2.1 网络配置中去除py_reader Inference program调用ctr_dnn_model()函数时添加`user_py_reader=False`参数。这会在ctr_dnn_model定义中将py_reader相关的代码去掉 修改前: ```python def train(): args = parse_args() if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim) ... ``` 修改后: ```python def train(): args = parse_args() if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim, use_py_reader=False) ... ``` ### 2.2 网络配置中修改feed targets和fetch targets 如第2节开头所述,为了使program适合于演示稀疏参数的使用,我们要裁剪program,将`ctr_dnn_model`中feed variable list和fetch variable分别改掉: 1) Inference program中26维稀疏特征的输入改为每个特征的embedding layer的output variable 2) fetch targets中返回的是predict,取代auc_var和batch_auc_var 截至写本文时,原始的网络配置 (network_conf.py中)`ctr_dnn_model`定义如下: ```python def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): def embedding_layer(input): emb = fluid.layers.embedding( input=input, is_sparse=True, # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 # if you want to set is_distributed to True is_distributed=False, size=[sparse_feature_dim, embedding_size], param_attr=fluid.ParamAttr(name="SparseFeatFactors", initializer=fluid.initializer.Uniform())) return fluid.layers.sequence_pool(input=emb, pool_type='average') # 需修改1 dense_input = fluid.layers.data( name="dense_input", shape=[dense_feature_dim], dtype='float32') sparse_input_ids = [ fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') for i in range(1, 27)] label = fluid.layers.data(name='label', shape=[1], dtype='int64') words = [dense_input] + sparse_input_ids + [label] py_reader = None if use_py_reader: py_reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=words, name='py_reader', use_double_buffer=True) words = fluid.layers.read_file(py_reader) sparse_embed_seq = list(map(embedding_layer, words[1:-1])) # 需修改2 concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) fc1 = fluid.layers.fc(input=concated, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(concated.shape[1])))) fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc1.shape[1])))) fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc2.shape[1])))) predict = fluid.layers.fc(input=fc3, size=2, act='softmax', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc3.shape[1])))) cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) avg_cost = fluid.layers.reduce_sum(cost) accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) auc_var, batch_auc_var, auc_states = \ fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) return avg_cost, auc_var, batch_auc_var, py_reader, words # 需修改3 ``` 修改后 ```python def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): def embedding_layer(input): emb = fluid.layers.embedding( input=input, is_sparse=True, # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 # if you want to set is_distributed to True is_distributed=False, size=[sparse_feature_dim, embedding_size], param_attr=fluid.ParamAttr(name="SparseFeatFactors", initializer=fluid.initializer.Uniform())) seq = fluid.layers.sequence_pool(input=emb, pool_type='average') return emb, seq # 对应上文修改处1 dense_input = fluid.layers.data( name="dense_input", shape=[dense_feature_dim], dtype='float32') sparse_input_ids = [ fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') for i in range(1, 27)] label = fluid.layers.data(name='label', shape=[1], dtype='int64') words = [dense_input] + sparse_input_ids + [label] sparse_embed_and_seq = list(map(embedding_layer, words[1:-1])) emb_list = [x[0] for x in sparse_embed_and_seq] # 对应上文修改处2 sparse_embed_seq = [x[1] for x in sparse_embed_and_seq] concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) train_feed_vars = words # 对应上文修改处2 inference_feed_vars = emb_list + words[0:1] fc1 = fluid.layers.fc(input=concated, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(concated.shape[1])))) fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc1.shape[1])))) fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc2.shape[1])))) predict = fluid.layers.fc(input=fc3, size=2, act='softmax', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc3.shape[1])))) cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) avg_cost = fluid.layers.reduce_sum(cost) accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) auc_var, batch_auc_var, auc_states = \ fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) fetch_vars = [predict] # 对应上文修改处3 return avg_cost, auc_var, batch_auc_var, train_feed_vars, inference_feed_vars, fetch_vars ``` 说明: 1) 修改处1,我们将embedding layer的输出变量返回 2) 修改处2,我们将embedding layer的输出变量保存到`emb_list`,后者进一步保存到`inference_feed_vars`,用来将来在`save_inference_model()`时指定feed variable list。 3) 修改处3,我们将`words`变量作为训练时的feed variable list (`train_feed_vars`),将embedding layer的output variable作为infer时的feed variable list (`inference_feed_vars`),将`predict`作为fetch target (`fetch_vars`),分别返回。`inference_feed_vars`和`fetch_vars`用于`fluid.io.save_inference_model()`时指定feed variable list和fetch target list ### 2.3 fluid.io.save_inference_model()保存裁剪后的program `fluid.io.save_inference_model()`不仅保存模型参数,还能够根据feed variable list和fetch target list参数,对program进行裁剪,形成适合inference用的program。大致原理是,根据前向网络配置,从fetch target list开始,反向查找其所依赖的OP列表,并将每个OP的输入加入目标variable list,再次递归地反向找到所有依赖OP和variable list。 在2.2节中我们已经拿到所需的`inference_feed_vars`和`fetch_vars`,接下来只要在训练过程中每次保存模型参数时改为调用`fluid.io.save_inference_model()`: 修改前: ```python def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, trainer_num, trainer_id): ...省略 for pass_id in range(args.num_passes): pass_start = time.time() batch_id = 0 py_reader.start() try: while True: loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) loss_val = np.mean(loss_val) auc_val = np.mean(auc_val) batch_auc_val = np.mean(batch_auc_val) logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) if batch_id % 1000 == 0 and batch_id != 0: model_dir = args.model_output_dir + '/batch-' + str(batch_id) if args.trainer_id == 0: fluid.io.save_persistables(executor=exe, dirname=model_dir, main_program=fluid.default_main_program()) batch_id += 1 except fluid.core.EOFException: py_reader.reset() print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start)) ...省略 ``` 修改后 ```python def train_loop(args, train_program, train_feed_vars, inference_feed_vars, # 裁剪program用的feed variable list fetch_vars, # 裁剪program用的fetch variable list loss, auc_var, batch_auc_var, trainer_num, trainer_id): # 因为已经将py_reader去掉,这里用fluid自带的DataFeeder dataset = reader.CriteoDataset(args.sparse_feature_dim) train_reader = paddle.batch( paddle.reader.shuffle( dataset.train([args.train_data_path], trainer_num, trainer_id), buf_size=args.batch_size * 100), batch_size=args.batch_size) inference_feed_var_names = [var.name for var in inference_feed_vars] place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) total_time = 0 pass_id = 0 batch_id = 0 feed_var_names = [var.name for var in feed_vars] feeder = fluid.DataFeeder(feed_var_names, place) for data in train_reader(): loss_val, auc_val, batch_auc_val = exe.run(fluid.default_main_program(), feed = feeder.feed(data), fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) fluid.io.save_inference_model(model_dir, inference_feed_var_names, fetch_vars, exe, fluid.default_main_program()) break # 我们只要裁剪后的program,不需要模型参数,因此只train一个batch就停止了 loss_val = np.mean(loss_val) auc_val = np.mean(auc_val) batch_auc_val = np.mean(batch_auc_val) logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) ``` ### 2.4 用python再次处理inference program,去除lookup_table OP 这一步是因为`fluid.io.save_inference_model()`裁剪出的program没有将lookup_table OP去除。未来如果`save_inference_model`接口完善,本节可跳过 主要代码: ```python ef prune_program(): # 从磁盘读入inference program args = parse_args() model_dir = args.model_output_dir + "/inference_only" model_file = model_dir + "/__model__" with open(model_file, "rb") as f: protostr = f.read() f.close() # 反序列化为protobuf message proto = framework_pb2.ProgramDesc.FromString(six.binary_type(protostr)) # 遍历所有OP,去除lookup_table block = proto.blocks[0] kept_ops = [op for op in block.ops if op.type != "lookup_table"] del block.ops[:] block.ops.extend(kept_ops) # 保存修改后的program with open(model_file + ".pruned", "wb") as f: f.write(proto.SerializePartialToString()) f.close() ``` ### 2.5 裁剪过程串到一起 我们提供了完整的裁剪CTR预估模型的脚本文件save_program.py,同[CTR分布式训练任务](doc/DISTRIBUTED_TRAINING_AND_SERVING.md)一起发布,可以在trainer和pserver容器的训练脚本目录下找到 ## 3. 整个预测计算流程 Client端: 1) Dense feature: 从dataset每条样例读取13个integer features,形成1个dense feature 2) Sparse feature: 从dataset每条样例读取26个categorical feature,分别经过hash(str(feature_index) + feature_string)签名,得到每个feature的id,形成26个sparse feature Serving端: 1) Dense feature: dense feature共13个float型数字,一起feed到网络dense_input这个variable对应的LodTensor 2) Sparse feature: 26个sparse feature id,分别访问kv服务获取对应的embedding向量,feed到对应的26个embedding layer的output variable。在我们裁剪出来的网络中,这些variable分别对应的变量名为embedding_0.tmp_0, embedding_1.tmp_0, ... embedding_25.tmp_0 3) 执行预测,获取预测结果。