general_reader_op.cpp 9.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

M
MRXLT 已提交
15
#include "core/general-server/op/general_reader_op.h"
16 17 18 19
#include <algorithm>
#include <iostream>
#include <memory>
#include <sstream>
20
#include "core/general-server/op/general_infer_helper.h"
21 22
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
G
guru4elephant 已提交
23
#include "core/util/include/timer.h"
24 25 26 27 28

namespace baidu {
namespace paddle_serving {
namespace serving {

G
guru4elephant 已提交
29
using baidu::paddle_serving::Timer;
30 31 32 33 34 35 36 37 38 39
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;

int conf_check(const Request *req,
               const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
  int var_num = req->insts(0).tensor_array_size();
  if (var_num != model_config->_feed_type.size()) {
B
barriery 已提交
40 41 42
    LOG(ERROR) << "feed var number not match: model config["
               << model_config->_feed_type.size() << "] vs. actual[" << var_num
               << "]";
43 44
    return -1;
  }
45 46 47

  VLOG(2) << "fetch var num in reader op: " << req->fetch_var_names_size();

48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
  for (int i = 0; i < var_num; ++i) {
    if (model_config->_feed_type[i] !=
        req->insts(0).tensor_array(i).elem_type()) {
      LOG(ERROR) << "feed type not match.";
      return -1;
    }
    if (model_config->_feed_shape[i].size() ==
        req->insts(0).tensor_array(i).shape_size()) {
      for (int j = 0; j < model_config->_feed_shape[i].size(); ++j) {
        req->insts(0).tensor_array(i).shape(j);
        if (model_config->_feed_shape[i][j] !=
            req->insts(0).tensor_array(i).shape(j)) {
          LOG(ERROR) << "feed shape not match.";
          return -1;
        }
      }
    } else {
      LOG(ERROR) << "feed shape not match.";
      return -1;
    }
  }
  return 0;
}

int GeneralReaderOp::inference() {
  // reade request from client
  const Request *req = dynamic_cast<const Request *>(get_request_message());
B
barriery 已提交
75
  uint64_t log_id = req->log_id();
76 77 78 79 80 81 82

  int batch_size = req->insts_size();
  int input_var_num = 0;
  std::vector<int64_t> elem_type;
  std::vector<int64_t> elem_size;
  std::vector<int64_t> capacity;

83 84
  GeneralBlob *res = mutable_data<GeneralBlob>();
  TensorVector *out = &res->tensor_vector;
85

M
MRXLT 已提交
86
  res->SetBatchSize(batch_size);
B
barriery 已提交
87
  res->SetLogId(log_id);
M
MRXLT 已提交
88

89
  if (!res) {
B
barriery 已提交
90 91
    LOG(ERROR) << "(logid=" << log_id
               << ") Failed get op tls reader object output";
92 93
  }

G
guru4elephant 已提交
94 95
  Timer timeline;
  int64_t start = timeline.TimeStampUS();
96
  int var_num = req->insts(0).tensor_array_size();
B
barriery 已提交
97
  VLOG(2) << "(logid=" << log_id << ") var num: " << var_num;
98

B
barriery 已提交
99 100
  VLOG(2) << "(logid=" << log_id
          << ") start to call load general model_conf op";
101 102 103
  baidu::paddle_serving::predictor::Resource &resource =
      baidu::paddle_serving::predictor::Resource::instance();

B
barriery 已提交
104
  VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
105 106 107
  std::shared_ptr<PaddleGeneralModelConfig> model_config =
      resource.get_general_model_config();

B
barriery 已提交
108
  VLOG(2) << "(logid=" << log_id << ") print general model config done.";
109

110
  // TODO(guru4elephant): how to do conditional check?
111
  /*
112 113
  int ret = conf_check(req, model_config);
  if (ret != 0) {
114
    LOG(ERROR) << "model conf of server:";
115 116 117
    resource.print_general_model_config(model_config);
    return 0;
  }
118
  */
119 120 121 122 123
  // package tensor

  elem_type.resize(var_num);
  elem_size.resize(var_num);
  capacity.resize(var_num);
124 125

  // prepare basic information for input
126
  for (int i = 0; i < var_num; ++i) {
127
    paddle::PaddleTensor lod_tensor;
128
    elem_type[i] = req->insts(0).tensor_array(i).elem_type();
B
barriery 已提交
129 130
    VLOG(2) << "(logid=" << log_id << ") var[" << i
            << "] has elem type: " << elem_type[i];
131 132 133
    if (elem_type[i] == 0) {  // int64
      elem_size[i] = sizeof(int64_t);
      lod_tensor.dtype = paddle::PaddleDType::INT64;
M
MRXLT 已提交
134
    } else if (elem_type[i] == 1) {
135 136
      elem_size[i] = sizeof(float);
      lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
M
MRXLT 已提交
137 138 139
    } else if (elem_type[i] == 2) {
      elem_size[i] = sizeof(int32_t);
      lod_tensor.dtype = paddle::PaddleDType::INT32;
140 141
    }

M
MRXLT 已提交
142
    if (model_config->_is_lod_feed[i]) {
143 144
      lod_tensor.lod.resize(1);
      lod_tensor.lod[0].push_back(0);
B
barriery 已提交
145
      VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
146 147 148 149 150
    } else {
      lod_tensor.shape.push_back(batch_size);
      capacity[i] = 1;
      for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
        int dim = req->insts(0).tensor_array(i).shape(k);
B
barriery 已提交
151 152
        VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
                << "]: " << dim;
153 154 155
        capacity[i] *= dim;
        lod_tensor.shape.push_back(dim);
      }
B
barriery 已提交
156 157
      VLOG(2) << "(logid=" << log_id << ") var[" << i
              << "] is tensor, capacity: " << capacity[i];
158
    }
159
    lod_tensor.name = model_config->_feed_name[i];
160
    out->push_back(lod_tensor);
161 162
  }

163
  // specify the memory needed for output tensor_vector
164
  for (int i = 0; i < var_num; ++i) {
165
    if (out->at(i).lod.size() == 1) {
M
MRXLT 已提交
166
      int tensor_size = 0;
167 168
      for (int j = 0; j < batch_size; ++j) {
        const Tensor &tensor = req->insts(j).tensor_array(i);
169 170 171
        int data_len = 0;
        if (tensor.int64_data_size() > 0) {
          data_len = tensor.int64_data_size();
M
MRXLT 已提交
172
        } else if (tensor.float_data_size() > 0) {
173
          data_len = tensor.float_data_size();
M
MRXLT 已提交
174 175
        } else if (tensor.int_data_size() > 0) {
          data_len = tensor.int_data_size();
176
        }
B
barriery 已提交
177 178
        VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
                << "]: " << data_len;
M
MRXLT 已提交
179
        tensor_size += data_len;
180

181
        int cur_len = out->at(i).lod[0].back();
B
barriery 已提交
182
        VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
183

M
MRXLT 已提交
184
        int sample_len = 0;
M
fix bug  
MRXLT 已提交
185 186 187 188 189 190
        if (tensor.shape_size() == 1) {
          sample_len = data_len;
        } else {
          sample_len = tensor.shape(0);
        }
        out->at(i).lod[0].push_back(cur_len + sample_len);
B
barriery 已提交
191
        VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
M
MRXLT 已提交
192 193 194 195 196
      }
      out->at(i).data.Resize(tensor_size * elem_size[i]);
      out->at(i).shape = {out->at(i).lod[0].back()};
      for (int j = 1; j < req->insts(0).tensor_array(i).shape_size(); ++j) {
        out->at(i).shape.push_back(req->insts(0).tensor_array(i).shape(j));
197
      }
M
fix bug  
MRXLT 已提交
198 199 200
      if (out->at(i).shape.size() == 1) {
        out->at(i).shape.push_back(1);
      }
B
barriery 已提交
201
      VLOG(2) << "(logid=" << log_id << ") var[" << i
202
              << "] is lod_tensor and len=" << out->at(i).lod[0].back();
203
    } else {
204
      out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
B
barriery 已提交
205
      VLOG(2) << "(logid=" << log_id << ") var[" << i
206 207 208 209
              << "] is tensor and capacity=" << batch_size * capacity[i];
    }
  }

210
  // fill the data into output general_blob
211 212
  for (int i = 0; i < var_num; ++i) {
    if (elem_type[i] == 0) {
213
      int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
B
barriery 已提交
214 215
      VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
              << "] is " << req->insts(0).tensor_array(i).int64_data(0);
216 217
      int offset = 0;
      for (int j = 0; j < batch_size; ++j) {
218 219
        int elem_num = req->insts(j).tensor_array(i).int64_data_size();
        for (int k = 0; k < elem_num; ++k) {
B
barrierye 已提交
220
          dst_ptr[offset + k] = req->insts(j).tensor_array(i).int64_data(k);
221
        }
222 223
        if (out->at(i).lod.size() == 1) {
          offset = out->at(i).lod[0][j + 1];
224 225 226 227
        } else {
          offset += capacity[i];
        }
      }
M
MRXLT 已提交
228
    } else if (elem_type[i] == 1) {
229
      float *dst_ptr = static_cast<float *>(out->at(i).data.data());
B
barriery 已提交
230 231
      VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
              << "] is " << req->insts(0).tensor_array(i).float_data(0);
232 233
      int offset = 0;
      for (int j = 0; j < batch_size; ++j) {
234 235
        int elem_num = req->insts(j).tensor_array(i).float_data_size();
        for (int k = 0; k < elem_num; ++k) {
B
barrierye 已提交
236
          dst_ptr[offset + k] = req->insts(j).tensor_array(i).float_data(k);
237
        }
238 239
        if (out->at(i).lod.size() == 1) {
          offset = out->at(i).lod[0][j + 1];
240 241 242 243
        } else {
          offset += capacity[i];
        }
      }
M
MRXLT 已提交
244 245
    } else if (elem_type[i] == 2) {
      int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
B
barriery 已提交
246 247
      VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
              << "] is " << req->insts(0).tensor_array(i).int_data(0);
M
MRXLT 已提交
248 249 250 251 252 253 254 255 256 257 258 259
      int offset = 0;
      for (int j = 0; j < batch_size; ++j) {
        int elem_num = req->insts(j).tensor_array(i).int_data_size();
        for (int k = 0; k < elem_num; ++k) {
          dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
        }
        if (out->at(i).lod.size() == 1) {
          offset = out->at(i).lod[0][j + 1];
        } else {
          offset += capacity[i];
        }
      }
260 261 262
    }
  }

B
barriery 已提交
263
  VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
264

G
guru4elephant 已提交
265 266 267 268 269 270
  timeline.Pause();
  int64_t end = timeline.TimeStampUS();
  res->p_size = 0;
  AddBlobInfo(res, start);
  AddBlobInfo(res, end);

B
barriery 已提交
271
  VLOG(2) << "(logid=" << log_id << ") read data from client success";
272 273 274 275 276 277
  return 0;
}
DEFINE_OP(GeneralReaderOp);
}  // namespace serving
}  // namespace paddle_serving
}  // namespace baidu