ctr_prediction_op.cpp 11.9 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "demo-serving/op/ctr_prediction_op.h"
#include <algorithm>
W
wangguibao 已提交
17
#include <string>
W
wangguibao 已提交
18
#if 0
W
wangguibao 已提交
19 20
#include <iomanip>
#endif
W
wangguibao 已提交
21
#include "cube/cube-api/include/cube_api.h"
W
wangguibao 已提交
22
#include "predictor/framework/infer.h"
W
wangguibao 已提交
23
#include "predictor/framework/kv_manager.h"
W
wangguibao 已提交
24 25
#include "predictor/framework/memory.h"

W
wangguibao 已提交
26 27 28
// Flag where enable profiling mode
DECLARE_bool(enable_ctr_profiling);

W
wangguibao 已提交
29 30 31 32 33 34 35 36 37 38
namespace baidu {
namespace paddle_serving {
namespace serving {

using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::ctr_prediction::CTRResInstance;
using baidu::paddle_serving::predictor::ctr_prediction::Response;
using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance;
using baidu::paddle_serving::predictor::ctr_prediction::Request;

W
wangguibao 已提交
39 40
const int VARIABLE_NAME_LEN = 256;

W
wangguibao 已提交
41 42 43 44 45 46 47 48 49 50 51
// Total 26 sparse input + 1 dense input
const int CTR_PREDICTION_INPUT_SLOTS = 27;

// First 26: sparse input
const int CTR_PREDICTION_SPARSE_SLOTS = 26;

// Last 1: dense input
const int CTR_PREDICTION_DENSE_SLOT_ID = 26;
const int CTR_PREDICTION_DENSE_DIM = 13;
const int CTR_PREDICTION_EMBEDDING_SIZE = 10;

W
wangguibao 已提交
52 53 54 55 56
bthread::Mutex CTRPredictionOp::mutex_;
int64_t CTRPredictionOp::cube_time_us_ = 0;
int32_t CTRPredictionOp::cube_req_num_ = 0;
int32_t CTRPredictionOp::cube_req_key_num_ = 0;

W
wangguibao 已提交
57 58 59 60 61 62 63 64 65 66 67 68
void fill_response_with_message(Response *response,
                                int err_code,
                                std::string err_msg) {
  if (response == NULL) {
    LOG(ERROR) << "response is NULL";
    return;
  }

  response->set_err_code(err_code);
  response->set_err_msg(err_msg);
  return;
}
W
wangguibao 已提交
69 70 71 72 73

int CTRPredictionOp::inference() {
  const Request *req = dynamic_cast<const Request *>(get_request_message());

  TensorVector *in = butil::get_object<TensorVector>();
W
wangguibao 已提交
74 75
  Response *res = mutable_data<Response>();

W
wangguibao 已提交
76 77 78
  uint32_t sample_size = req->instances_size();
  if (sample_size <= 0) {
    LOG(WARNING) << "No instances need to inference!";
W
wangguibao 已提交
79
    fill_response_with_message(res, -1, "Sample size invalid");
W
wangguibao 已提交
80
    return 0;
W
wangguibao 已提交
81 82 83
  }

  paddle::PaddleTensor lod_tensors[CTR_PREDICTION_INPUT_SLOTS];
W
wangguibao 已提交
84
  for (int i = 0; i < CTR_PREDICTION_INPUT_SLOTS; ++i) {
W
wangguibao 已提交
85 86 87 88 89 90
    lod_tensors[i].dtype = paddle::PaddleDType::FLOAT32;
    std::vector<std::vector<size_t>> &lod = lod_tensors[i].lod;
    lod.resize(1);
    lod[0].push_back(0);
  }

W
wangguibao 已提交
91
  // Query cube API for sparse embeddings
W
wangguibao 已提交
92 93
  std::vector<uint64_t> keys;
  std::vector<rec::mcube::CubeValue> values;
W
wangguibao 已提交
94 95 96

  for (uint32_t si = 0; si < sample_size; ++si) {
    const CTRReqInstance &req_instance = req->instances(si);
W
wangguibao 已提交
97
    if (req_instance.sparse_ids_size() != CTR_PREDICTION_SPARSE_SLOTS) {
W
wangguibao 已提交
98
      std::ostringstream iss;
W
wangguibao 已提交
99
      iss << "Sparse input size != " << CTR_PREDICTION_SPARSE_SLOTS;
W
wangguibao 已提交
100
      fill_response_with_message(res, -1, iss.str());
W
wangguibao 已提交
101
      return 0;
W
wangguibao 已提交
102 103 104 105 106 107 108
    }

    for (int i = 0; i < req_instance.sparse_ids_size(); ++i) {
      keys.push_back(req_instance.sparse_ids(i));
    }
  }

W
wangguibao 已提交
109 110 111 112
  rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
  predictor::KVManager &kv_manager = predictor::KVManager::instance();
  const predictor::KVInfo *kvinfo =
      kv_manager.get_kv_info(CTR_PREDICTION_MODEL_NAME);
W
wangguibao 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  if (kvinfo == NULL) {
    LOG(ERROR) << "Sparse param service info not found for model "
               << CTR_PREDICTION_MODEL_NAME
               << ". Maybe forgot to specify sparse_param_service_type and "
               << "sparse_param_service_table_name in "
               << "conf/model_toolkit.prototxt";
    fill_response_with_message(res, -1, "Sparse param service info not found");
    return 0;
  }

  std::string table_name;
  if (kvinfo->sparse_param_service_type != configure::EngineDesc::NONE) {
    table_name = kvinfo->sparse_param_service_table_name;
    if (table_name.empty()) {
      LOG(ERROR) << "sparse_param_service_table_name not specified. "
                 << "Please specify it in conf/model_toolkit.protxt for model "
                 << CTR_PREDICTION_MODEL_NAME;
      fill_response_with_message(
          res, -1, "sparse_param_service_table_name not specified");
W
wangguibao 已提交
132
      return 0;
W
wangguibao 已提交
133
    }
W
wangguibao 已提交
134
  }
W
wangguibao 已提交
135

W
wangguibao 已提交
136 137 138 139 140 141 142 143 144 145
  if (kvinfo->sparse_param_service_type == configure::EngineDesc::LOCAL) {
    // Query local KV service
    LOG(ERROR) << "Local kv service not supported for model "
               << CTR_PREDICTION_MODEL_NAME;

    fill_response_with_message(
        res, -1, "Local kv service not supported for this model");
    return 0;
  } else if (kvinfo->sparse_param_service_type ==
             configure::EngineDesc::REMOTE) {
W
wangguibao 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    struct timeval start;
    struct timeval end;

    int ret;

    if (FLAGS_enable_ctr_profiling) {
      gettimeofday(&start, NULL);
      ret = cube->seek(table_name, keys, &values);
      gettimeofday(&end, NULL);
      uint64_t usec =
          end.tv_sec * 1e6 + end.tv_usec - start.tv_sec * 1e6 - start.tv_usec;

      // Statistics
      mutex_.lock();
      cube_time_us_ += usec;
      ++cube_req_num_;
      cube_req_key_num_ += keys.size();
      mutex_.unlock();
    } else {
      ret = cube->seek(table_name, keys, &values);
    }

W
wangguibao 已提交
168 169 170 171
    if (ret != 0) {
      fill_response_with_message(res, -1, "Query cube for embeddings error");
      LOG(ERROR) << "Query cube for embeddings error";
      return 0;
W
wangguibao 已提交
172
    }
W
wangguibao 已提交
173
  }
W
wangguibao 已提交
174

W
wangguibao 已提交
175 176 177 178 179 180 181 182 183 184
  if (values.size() != keys.size()) {
    LOG(ERROR) << "Sparse embeddings not ready; "
               << "maybe forgot to set sparse_param_service_type and "
               << "sparse_param_sevice_table_name for "
               << CTR_PREDICTION_MODEL_NAME
               << " in conf/model_toolkit.prototxt";
    fill_response_with_message(
        res, -1, "Sparse param service not configured properly");
    return 0;
  }
W
wangguibao 已提交
185

W
wangguibao 已提交
186 187 188 189 190 191 192 193 194 195 196
  for (int i = 0; i < keys.size(); ++i) {
    std::ostringstream oss;
    oss << keys[i] << ": ";
    const char *value = (values[i].buff.data());
    if (values[i].buff.size() !=
        sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
      LOG(WARNING) << "Key " << keys[i] << " has values less than "
                   << CTR_PREDICTION_EMBEDDING_SIZE;
    }

#if 0
W
wangguibao 已提交
197 198
      for (int j = 0; j < values[i].buff.size(); ++j) {
        oss << std::hex << std::uppercase << std::setw(2) << std::setfill('0')
W
wangguibao 已提交
199
            << (static_cast<int>(value[j]) & 0xff);
W
wangguibao 已提交
200 201 202 203
      }

      LOG(INFO) << oss.str().c_str();
#endif
W
wangguibao 已提交
204
  }
W
wangguibao 已提交
205

W
wangguibao 已提交
206
  // Sparse embeddings
W
wangguibao 已提交
207
  for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) {
W
wangguibao 已提交
208
    paddle::PaddleTensor &lod_tensor = lod_tensors[i];
W
wangguibao 已提交
209 210
    std::vector<std::vector<size_t>> &lod = lod_tensor.lod;

W
wangguibao 已提交
211 212 213 214
    char name[VARIABLE_NAME_LEN];
    snprintf(name, VARIABLE_NAME_LEN, "embedding_%d.tmp_0", i);
    lod_tensor.name = std::string(name);

W
wangguibao 已提交
215 216 217 218 219
    for (uint32_t si = 0; si < sample_size; ++si) {
      const CTRReqInstance &req_instance = req->instances(si);
      lod[0].push_back(lod[0].back() + 1);
    }

W
wangguibao 已提交
220 221 222
    lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_EMBEDDING_SIZE};
    lod_tensor.data.Resize(lod[0].back() * sizeof(float) *
                           CTR_PREDICTION_EMBEDDING_SIZE);
W
wangguibao 已提交
223 224 225

    int offset = 0;
    for (uint32_t si = 0; si < sample_size; ++si) {
W
wangguibao 已提交
226
      float *data_ptr = static_cast<float *>(lod_tensor.data.data()) + offset;
W
wangguibao 已提交
227
      const CTRReqInstance &req_instance = req->instances(si);
W
wangguibao 已提交
228 229 230 231 232 233 234

      int idx = si * CTR_PREDICTION_SPARSE_SLOTS + i;
      if (values[idx].buff.size() !=
          sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
        LOG(ERROR) << "Embedding vector size not expected";
        fill_response_with_message(
            res, -1, "Embedding vector size not expected");
W
wangguibao 已提交
235
        return 0;
W
wangguibao 已提交
236 237 238 239
      }

      memcpy(data_ptr, values[idx].buff.data(), values[idx].buff.size());
      offset += CTR_PREDICTION_EMBEDDING_SIZE;
W
wangguibao 已提交
240 241 242 243 244
    }

    in->push_back(lod_tensor);
  }

W
wangguibao 已提交
245
  // Dense features
W
wangguibao 已提交
246 247
  paddle::PaddleTensor &lod_tensor = lod_tensors[CTR_PREDICTION_DENSE_SLOT_ID];
  lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
W
wangguibao 已提交
248
  std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
W
wangguibao 已提交
249
  lod_tensor.name = std::string("dense_input");
W
wangguibao 已提交
250 251 252

  for (uint32_t si = 0; si < sample_size; ++si) {
    const CTRReqInstance &req_instance = req->instances(si);
W
wangguibao 已提交
253 254 255 256
    if (req_instance.dense_ids_size() != CTR_PREDICTION_DENSE_DIM) {
      std::ostringstream iss;
      iss << "dense input size != " << CTR_PREDICTION_DENSE_DIM;
      fill_response_with_message(res, -1, iss.str());
W
wangguibao 已提交
257
      return 0;
W
wangguibao 已提交
258
    }
W
wangguibao 已提交
259 260 261
    lod[0].push_back(lod[0].back() + req_instance.dense_ids_size());
  }

W
wangguibao 已提交
262 263 264
  lod_tensor.shape = {lod[0].back() / CTR_PREDICTION_DENSE_DIM,
                      CTR_PREDICTION_DENSE_DIM};
  lod_tensor.data.Resize(lod[0].back() * sizeof(float));
W
wangguibao 已提交
265 266 267

  int offset = 0;
  for (uint32_t si = 0; si < sample_size; ++si) {
W
wangguibao 已提交
268
    float *data_ptr = static_cast<float *>(lod_tensor.data.data()) + offset;
W
wangguibao 已提交
269 270 271
    const CTRReqInstance &req_instance = req->instances(si);
    int id_count = req_instance.dense_ids_size();
    memcpy(data_ptr,
W
wangguibao 已提交
272
           req_instance.dense_ids().data(),
W
wangguibao 已提交
273
           sizeof(float) * req_instance.dense_ids_size());
W
wangguibao 已提交
274 275 276 277 278 279 280 281
    offset += req_instance.dense_ids_size();
  }

  in->push_back(lod_tensor);

  TensorVector *out = butil::get_object<TensorVector>();
  if (!out) {
    LOG(ERROR) << "Failed get tls output object";
W
wangguibao 已提交
282
    fill_response_with_message(res, -1, "Failed get thread local resource");
W
wangguibao 已提交
283
    return 0;
W
wangguibao 已提交
284 285 286 287 288 289 290
  }

  // call paddle fluid model for inferencing
  if (predictor::InferManager::instance().infer(
          CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) {
    LOG(ERROR) << "Failed do infer in fluid model: "
               << CTR_PREDICTION_MODEL_NAME;
W
wangguibao 已提交
291
    fill_response_with_message(res, -1, "Failed do infer in fluid model");
W
wangguibao 已提交
292
    return 0;
W
wangguibao 已提交
293 294
  }

W
wangguibao 已提交
295 296 297 298
  if (out->size() != 1) {
    LOG(ERROR) << "Model returned number of fetch tensor more than 1";
    fill_response_with_message(
        res, -1, "Model returned number of fetch tensor more than 1");
W
wangguibao 已提交
299
    return 0;
W
wangguibao 已提交
300 301
  }

W
wangguibao 已提交
302 303 304 305 306 307 308
  int output_shape_dim = out->at(0).shape.size();
  if (output_shape_dim != 2) {
    LOG(ERROR) << "Fetch LoDTensor should be shape of [sample_size, 2]";
    fill_response_with_message(
        res, -1, "Fetch LoDTensor should be shape of [sample_size, 2]");
    return 0;
  }
W
wangguibao 已提交
309

W
wangguibao 已提交
310 311 312 313 314 315
  if (out->at(0).dtype != paddle::PaddleDType::FLOAT32) {
    LOG(ERROR) << "Fetch LoDTensor data type should be FLOAT32";
    fill_response_with_message(
        res, -1, "Fetch LoDTensor data type should be FLOAT32");
    return 0;
  }
W
wangguibao 已提交
316

W
wangguibao 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  int dim1 = out->at(0).shape[0];
  int dim2 = out->at(0).shape[1];

  if (dim1 != sample_size) {
    LOG(ERROR) << "Returned result count not equal to sample_size";
    fill_response_with_message(
        res, -1, "Returned result count not equal to sample size");
    return 0;
  }

  if (dim2 != 2) {
    LOG(ERROR) << "Returned result is not expected, should be 2 floats for "
                  "each sample";
    fill_response_with_message(
        res, -1, "Retunred result is not 2 floats for each sample");
    return 0;
  }

  float *data = static_cast<float *>(out->at(0).data.data());
  for (int i = 0; i < dim1; ++i) {
    CTRResInstance *res_instance = res->add_predictions();
    res_instance->set_prob0(data[i * dim2]);
    res_instance->set_prob1(data[i * dim2 + 1]);
W
wangguibao 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352
  }

  for (size_t i = 0; i < in->size(); ++i) {
    (*in)[i].shape.clear();
  }
  in->clear();
  butil::return_object<TensorVector>(in);

  for (size_t i = 0; i < out->size(); ++i) {
    (*out)[i].shape.clear();
  }
  out->clear();
  butil::return_object<TensorVector>(out);
W
wangguibao 已提交
353 354 355

  res->set_err_code(0);
  res->set_err_msg(std::string(""));
W
wangguibao 已提交
356 357 358 359 360 361 362 363
  return 0;
}

DEFINE_OP(CTRPredictionOp);

}  // namespace serving
}  // namespace paddle_serving
}  // namespace baidu