CTR_PREDICTION.md 16.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
# CTR预估模型

## 1. 背景

在搜索、推荐、在线广告等业务场景中,embedding参数的规模常常非常庞大,达到数百GB甚至T级别;训练如此规模的模型需要用到多机分布式训练能力,将参数分片更新和保存;另一方面,训练好的模型,要应用于在线业务,也难以单机加载。Paddle Serving提供大规模稀疏参数读写服务,用户可以方便地将超大规模的稀疏参数以kv形式托管到参数服务,在线预测只需将所需要的参数子集从参数服务读取回来,再执行后续的预测流程。

我们以CTR预估模型为例,演示Paddle Serving中如何使用大规模稀疏参数服务。关于模型细节请参考[原始模型](https://github.com/PaddlePaddle/models/tree/v1.5/PaddleRec/ctr)

根据[对数据集的描述](https://www.kaggle.com/c/criteo-display-ad-challenge/data),该模型原始输入为13维integer features和26维categorical features。在我们的模型中,13维integer feature作为dense feature整体feed到一个data layer,而26维categorical features各自作为一个feature分别feed到一个data layer。除此之外,为计算auc指标,还将label作为一个feature输入。

若按缺省训练参数,本模型的embedding dim为100w,size为10,也就是参数矩阵为1000000 x 10的float型矩阵,实际占用内存共1000000 x 10 x sizeof(float) = 39MB;**实际场景中,embedding参数要大的多;因此该demo仅为演示使用**


## 2. 模型裁剪

在写本文档时([v1.5](https://github.com/PaddlePaddle/models/tree/v1.5)),训练脚本用PaddlePaddle py_reader加速样例读取速度,program中带有py_reader相关OP,且训练过程中只保存了模型参数,没有保存program,保存的参数没法直接用预测库加载;另外原始网络中最终输出的tensor是auc和batch_auc,而实际模型用于预测时只需要每个样例的predict,需要改掉模型的输出tensor为predict。再有,为了演示稀疏参数服务的使用,我们要有意将embedding layer包含的lookup_table OP从预测program中拿掉,以embedding layer的output variable作为网络的输入,然后再添加对应的feed OP,使得我们能够在预测时从稀疏参数服务获取到embedding向量后,将数据直接feed到各个embedding的output variable。

基于以上几方面考虑,我们需要对原始program进行裁剪。大致过程为:

1) 去掉py_reader相关代码,改为用fluid自带的reader和DataFeed
2) 修改原始网络配置,将predict变量作为fetch target
3) 修改原始网络配置,将26个稀疏参数的embedding layer的output作为feed target,以与后续稀疏参数服务配合使用
4) 修改后的网络,本地train 1个batch后,调用`fluid.io.save_inference_model()`,获得裁剪后的模型program
5) 裁剪后的program,用python再次处理,去掉embedding layer的lookup_table OP。这是因为,当前Paddle Fluid在第4步`save_inference_model()`时没有裁剪干净,还保留了embedding的lookup_table OP;如果这些OP不去除掉,那么embedding的output variable就会有2个输入OP:一个是feed OP(我们要添加的),一个是lookup_table;而lookup_table又没有输入,它的输出会与feed OP的输出互相覆盖,导致错乱
6) 第4步拿到的program,与分布式训练保存的模型参数(除embedding之外)保存到一起,形成完整的预测模型

第1) - 第5)步裁剪完毕后的模型网络配置如下:

![Pruned CTR prediction network](doc/pruned-ctr-network.png)


整个裁剪过程具体说明如下:

### 2.1 网络配置中去除py_reader

Inference program调用ctr_dnn_model()函数时添加`user_py_reader=False`参数。这会在ctr_dnn_model定义中将py_reader相关的代码去掉

修改前:
```python
def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

    loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
    ...
```

修改后:
```python
def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

    loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim, use_py_reader=False)
    ...
```


### 2.2 网络配置中修改feed targets和fetch targets

如第2节开头所述,为了使program适合于演示稀疏参数的使用,我们要裁剪program,将`ctr_dnn_model`中feed variable list和fetch variable分别改掉:

1) Inference program中26维稀疏特征的输入改为每个特征的embedding layer的output variable
2) fetch targets中返回的是predict,取代auc_var和batch_auc_var

截至写本文时,原始的网络配置 (network_conf.py中)`ctr_dnn_model`定义如下:

```python
def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):

    def embedding_layer(input):
        emb = fluid.layers.embedding(
            input=input,
            is_sparse=True,
            # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190
            # if you want to set is_distributed to True
            is_distributed=False,
            size=[sparse_feature_dim, embedding_size],
            param_attr=fluid.ParamAttr(name="SparseFeatFactors",
                                       initializer=fluid.initializer.Uniform()))
        return fluid.layers.sequence_pool(input=emb, pool_type='average')    # 需修改1

    dense_input = fluid.layers.data(
        name="dense_input", shape=[dense_feature_dim], dtype='float32')

    sparse_input_ids = [
        fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
        for i in range(1, 27)]

    label = fluid.layers.data(name='label', shape=[1], dtype='int64')

    words = [dense_input] + sparse_input_ids + [label]

    py_reader = None
    if use_py_reader:
        py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
                                                          feed_list=words,
                                                          name='py_reader',
                                                          use_double_buffer=True)
        words = fluid.layers.read_file(py_reader)

    sparse_embed_seq = list(map(embedding_layer, words[1:-1]))              # 需修改2
    concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)

    fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(concated.shape[1]))))
    fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(fc1.shape[1]))))
    fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(fc2.shape[1]))))
    predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
                              param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                                  scale=1 / math.sqrt(fc3.shape[1]))))

    cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
    avg_cost = fluid.layers.reduce_sum(cost)
    accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
    auc_var, batch_auc_var, auc_states = \
        fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)

    return avg_cost, auc_var, batch_auc_var, py_reader, words              # 需修改3
```

修改后

```python
def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):
    def embedding_layer(input):
        emb = fluid.layers.embedding(
            input=input,
            is_sparse=True,
            # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190
            # if you want to set is_distributed to True
            is_distributed=False,
            size=[sparse_feature_dim, embedding_size],
            param_attr=fluid.ParamAttr(name="SparseFeatFactors",
                                       initializer=fluid.initializer.Uniform()))
        seq = fluid.layers.sequence_pool(input=emb, pool_type='average')
        return emb, seq                                                   # 对应上文修改处1
    dense_input = fluid.layers.data(
        name="dense_input", shape=[dense_feature_dim], dtype='float32')
    sparse_input_ids = [
        fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
        for i in range(1, 27)]
    label = fluid.layers.data(name='label', shape=[1], dtype='int64')
    words = [dense_input] + sparse_input_ids + [label]
    sparse_embed_and_seq = list(map(embedding_layer, words[1:-1]))
    
    emb_list = [x[0] for x in sparse_embed_and_seq]                       # 对应上文修改处2
    sparse_embed_seq = [x[1] for x in sparse_embed_and_seq]
    
    concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)
    
    train_feed_vars = words                                              # 对应上文修改处2
    inference_feed_vars = emb_list + words[0:1]
    
    fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(concated.shape[1]))))
    fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(fc1.shape[1]))))
    fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
                          param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                              scale=1 / math.sqrt(fc2.shape[1]))))
    predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
                              param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
                                  scale=1 / math.sqrt(fc3.shape[1]))))
    cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
    avg_cost = fluid.layers.reduce_sum(cost)
    accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
    auc_var, batch_auc_var, auc_states = \
        fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)
    fetch_vars = [predict]
    
    # 对应上文修改处3
    return avg_cost, auc_var, batch_auc_var, train_feed_vars, inference_feed_vars, fetch_vars
```

说明:

1) 修改处1,我们将embedding layer的输出变量返回
2) 修改处2,我们将embedding layer的输出变量保存到`emb_list`,后者进一步保存到`inference_feed_vars`,用来将来在`save_inference_model()`时指定feed variable list。
3) 修改处3,我们将`words`变量作为训练时的feed variable list (`train_feed_vars`),将embedding layer的output variable作为infer时的feed variable list (`inference_feed_vars`),将`predict`作为fetch target (`fetch_vars`),分别返回。`inference_feed_vars``fetch_vars`用于`fluid.io.save_inference_model()`时指定feed variable list和fetch target list


### 2.3 fluid.io.save_inference_model()保存裁剪后的program

`fluid.io.save_inference_model()`不仅保存模型参数,还能够根据feed variable list和fetch target list参数,对program进行裁剪,形成适合inference用的program。大致原理是,根据前向网络配置,从fetch target list开始,反向查找其所依赖的OP列表,并将每个OP的输入加入目标variable list,再次递归地反向找到所有依赖OP和variable list。

在2.2节中我们已经拿到所需的`inference_feed_vars``fetch_vars`,接下来只要在训练过程中每次保存模型参数时改为调用`fluid.io.save_inference_model()`

修改前:

```python
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
               trainer_num, trainer_id):
    
...省略
    for pass_id in range(args.num_passes):
        pass_start = time.time()
        batch_id = 0
        py_reader.start()

        try:
            while True:
                loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
                loss_val = np.mean(loss_val)
                auc_val = np.mean(auc_val)
                batch_auc_val = np.mean(batch_auc_val)

                logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
                      .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
                if batch_id % 1000 == 0 and batch_id != 0:
                    model_dir = args.model_output_dir + '/batch-' + str(batch_id)
                    if args.trainer_id == 0:
                        fluid.io.save_persistables(executor=exe, dirname=model_dir,
                                                   main_program=fluid.default_main_program())
                batch_id += 1
        except fluid.core.EOFException:
            py_reader.reset()
        print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
...省略
```

修改后

```python
def train_loop(args,
               train_program,
               train_feed_vars,
               inference_feed_vars,  # 裁剪program用的feed variable list
               fetch_vars,           # 裁剪program用的fetch variable list
               loss,
               auc_var,
               batch_auc_var,
               trainer_num,
               trainer_id):
    # 因为已经将py_reader去掉,这里用fluid自带的DataFeeder
    dataset = reader.CriteoDataset(args.sparse_feature_dim)
    train_reader = paddle.batch(
        paddle.reader.shuffle(
            dataset.train([args.train_data_path], trainer_num, trainer_id),
            buf_size=args.batch_size * 100),
        batch_size=args.batch_size)
    
    inference_feed_var_names = [var.name for var in inference_feed_vars]
    
    place = fluid.CPUPlace()
    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
    total_time = 0
    pass_id = 0
    batch_id = 0

    feed_var_names = [var.name for var in feed_vars]
    feeder = fluid.DataFeeder(feed_var_names, place)

    for data in train_reader():
        loss_val, auc_val, batch_auc_val = exe.run(fluid.default_main_program(),
                                            feed = feeder.feed(data),
                                            fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
        fluid.io.save_inference_model(model_dir,
                                      inference_feed_var_names,
                                      fetch_vars,
                                      exe,
                                      fluid.default_main_program())
        break # 我们只要裁剪后的program,不需要模型参数,因此只train一个batch就停止了
    loss_val = np.mean(loss_val)
    auc_val = np.mean(auc_val)
    batch_auc_val = np.mean(batch_auc_val)
    logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
                      .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
```

### 2.4 用python再次处理inference program,去除lookup_table OP

这一步是因为`fluid.io.save_inference_model()`裁剪出的program没有将lookup_table OP去除。未来如果`save_inference_model`接口完善,本节可跳过

主要代码:

```python
ef prune_program():
    # 从磁盘读入inference program
    args = parse_args()
    model_dir = args.model_output_dir + "/inference_only"
    model_file = model_dir + "/__model__"
    with open(model_file, "rb") as f:
        protostr = f.read()
    f.close()
    
    # 反序列化为protobuf message
    proto = framework_pb2.ProgramDesc.FromString(six.binary_type(protostr))
    
    # 遍历所有OP,去除lookup_table
    block = proto.blocks[0]
    kept_ops = [op for op in block.ops if op.type != "lookup_table"]
    del block.ops[:]
    block.ops.extend(kept_ops)
    
    # 保存修改后的program
    with open(model_file + ".pruned", "wb") as f:
        f.write(proto.SerializePartialToString())
    f.close()
```
### 2.5 裁剪过程串到一起

我们提供了完整的裁剪CTR预估模型的脚本文件save_program.py,同[CTR分布式训练任务](doc/DISTRIBUTED_TRAINING_AND_SERVING.md)一起发布,可以在trainer和pserver容器的训练脚本目录下找到

## 3. 整个预测计算流程

Client端:
1) Dense feature: 从dataset每条样例读取13个integer features,形成1个dense feature
2) Sparse feature: 从dataset每条样例读取26个categorical feature,分别经过hash(str(feature_index) + feature_string)签名,得到每个feature的id,形成26个sparse feature

Serving端:
1) Dense feature: dense feature共13个float型数字,一起feed到网络dense_input这个variable对应的LodTensor
2) Sparse feature: 26个sparse feature id,分别访问kv服务获取对应的embedding向量,feed到对应的26个embedding layer的output variable。在我们裁剪出来的网络中,这些variable分别对应的变量名为embedding_0.tmp_0, embedding_1.tmp_0, ... embedding_25.tmp_0
3) 执行预测,获取预测结果。