infer.h 27.1 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
16
#include <pthread.h>
W
wangguibao 已提交
17
#include <sys/stat.h>
T
TeslaZhao 已提交
18
#include <sys/syscall.h>
W
wangguibao 已提交
19
#include <sys/types.h>
W
wangguibao 已提交
20
#include <unistd.h>
H
HexToString 已提交
21
#include <functional>
H
HexToString 已提交
22
#include <memory>
23
#include <numeric>
W
wangguibao 已提交
24
#include <string>
M
MRXLT 已提交
25
#include <utility>
W
wangguibao 已提交
26
#include <vector>
G
guru4elephant 已提交
27
#include "core/predictor/common/inner_common.h"
H
HexToString 已提交
28
#include "core/predictor/framework/bsf.h"
T
TeslaZhao 已提交
29
#include "core/predictor/framework/cache.h"
G
guru4elephant 已提交
30 31
#include "core/predictor/framework/factory.h"
#include "core/predictor/framework/infer_data.h"
32
#include "core/predictor/framework/memory.h"
S
ShiningZhang 已提交
33
#include "core/predictor/framework/predictor_metric.h"
H
HexToString 已提交
34
#include "paddle_inference_api.h"  // NOLINT
T
Thomas Young 已提交
35
#include "experimental/float16.h"
W
wangguibao 已提交
36 37 38 39
namespace baidu {
namespace paddle_serving {
namespace predictor {

W
wangguibao 已提交
40 41
using configure::ModelToolkitConf;

T
TeslaZhao 已提交
42
// Auto mutex lock
Z
zhangjun 已提交
43 44 45 46 47 48 49 50 51 52 53
class AutoLock {
 public:
  explicit AutoLock(pthread_mutex_t& mutex) : _mut(mutex) {
    pthread_mutex_lock(&mutex);
  }
  ~AutoLock() { pthread_mutex_unlock(&_mut); }

 private:
  pthread_mutex_t& _mut;
};

T
TeslaZhao 已提交
54
// Gloabl singleton mutex lock
Z
update  
zhangjun 已提交
55
class GlobalCreateMutex {
Z
zhangjun 已提交
56 57 58 59
 public:
  pthread_mutex_t& mutex() { return _mut; }

  static pthread_mutex_t& instance() {
Z
update  
zhangjun 已提交
60
    static GlobalCreateMutex gmutex;
Z
zhangjun 已提交
61 62 63 64
    return gmutex.mutex();
  }

 private:
Z
update  
zhangjun 已提交
65
  GlobalCreateMutex() { pthread_mutex_init(&_mut, NULL); }
Z
zhangjun 已提交
66 67 68
  pthread_mutex_t _mut;
};

T
TeslaZhao 已提交
69
// InferEngine
W
wangguibao 已提交
70
class InferEngine {
W
wangguibao 已提交
71 72 73 74 75 76 77 78 79 80
 public:
  virtual ~InferEngine() {}

  virtual int proc_initialize(const configure::EngineDesc& conf, bool version) {
    return proc_initialize_impl(conf, version);
  }
  virtual int proc_finalize() { return proc_finalize_impl(); }
  virtual int thrd_initialize() { return thrd_initialize_impl(); }
  virtual int thrd_clear() { return thrd_clear_impl(); }
  virtual int thrd_finalize() { return thrd_finalize_impl(); }
H
HexToString 已提交
81 82 83
  virtual int infer(const void* in, void* out, uint32_t batch_size = -1) {
    return infer_impl(in, out, batch_size);
  }
H
HexToString 已提交
84
  virtual void set_model_index(uint32_t index) { _model_index = index; }
W
wangguibao 已提交
85 86 87 88 89 90 91 92 93 94 95
  virtual int reload() = 0;

  virtual uint64_t version() const = 0;

  // begin: framework inner call
  virtual int proc_initialize_impl(const configure::EngineDesc& conf,
                                   bool version) = 0;
  virtual int thrd_initialize_impl() = 0;
  virtual int thrd_finalize_impl() = 0;
  virtual int thrd_clear_impl() = 0;
  virtual int proc_finalize_impl() = 0;
H
HexToString 已提交
96
  virtual int infer_impl(const void* in,
97 98
                         void* out,
                         uint32_t batch_size = -1) = 0;
99
  virtual int task_infer_impl(const void* in, void* out) = 0;  // NOLINT
T
TeslaZhao 已提交
100
  virtual CubeCache* get_cube_cache() = 0;
H
HexToString 已提交
101

H
HexToString 已提交
102 103
 protected:
  uint32_t _model_index;
W
wangguibao 已提交
104 105
  // end: framework inner call
};
T
TeslaZhao 已提交
106

H
HexToString 已提交
107
typedef im::bsf::Task<paddle::PaddleTensor, paddle::PaddleTensor> TaskT;
W
wangguibao 已提交
108 109 110
class ReloadableInferEngine : public InferEngine {
 public:
  virtual ~ReloadableInferEngine() {}
W
wangguibao 已提交
111

112 113 114 115 116
  // Reloadable record
  union ReloadableRecord {
    time_t timestamp;
    uint64_t md5sum;
    uint64_t revision;
W
wangguibao 已提交
117
  };
W
wangguibao 已提交
118

Z
update  
zhangjun 已提交
119
  virtual int load(const configure::EngineDesc& conf) = 0;
W
wangguibao 已提交
120

121
  int proc_initialize_impl(const configure::EngineDesc& conf, bool version);
W
wangguibao 已提交
122

123
  int proc_initialize(const configure::EngineDesc& conf, bool version);
H
HexToString 已提交
124

125
  int infer(const void* in, void* out, uint32_t batch_size = -1);
W
wangguibao 已提交
126

127
  int thrd_initialize();
W
wangguibao 已提交
128

129
  int thrd_clear();
W
wangguibao 已提交
130

131
  int proc_finalize();
W
wangguibao 已提交
132

133
  int reload();
W
wangguibao 已提交
134 135 136

  uint64_t version() const { return _version; }
  uint32_t thread_num() const { return _infer_thread_num; }
W
wangguibao 已提交
137

W
wangguibao 已提交
138
 private:
139
  int parse_version_info(const configure::EngineDesc& config, bool version);
W
wangguibao 已提交
140

141
  bool check_need_reload();
W
wangguibao 已提交
142

143
  bool check_timestamp_ne();
W
wangguibao 已提交
144

145
  bool check_timestamp_gt();
W
wangguibao 已提交
146 147 148 149 150 151

  bool check_md5sum() { return false; }

  bool check_revision() { return false; }

 protected:
152 153 154 155
  // Model directory
  std::string _model_dir;

  // The description of inference engine
Z
update  
zhangjun 已提交
156
  configure::EngineDesc _conf;
W
wangguibao 已提交
157 158

 private:
159
  // Tag file of reloadable model
W
wangguibao 已提交
160
  std::string _reload_tag_file;
161 162 163 164 165 166 167 168

  // Type of reload, e.g. timestamp_ne, timestamp_gt, md5sum, reversion
  std::string _reload_type;

  // Record the last loading infermation
  ReloadableRecord _last_record;

  // Number of inference threads
W
wangguibao 已提交
169
  uint32_t _infer_thread_num;
170 171

  // Size of inference batch
W
wangguibao 已提交
172
  uint32_t _infer_batch_size;
173 174

  // Need to align batch_size in inferring
H
HexToString 已提交
175
  bool _infer_overrun;
176

H
HexToString 已提交
177 178
  // allow to split request in inferring
  bool _allow_split_request;
179
  // model version
W
wangguibao 已提交
180 181
  uint64_t _version;
};
W
wangguibao 已提交
182

T
TeslaZhao 已提交
183
// Lock free switching two models and cube caches
W
wangguibao 已提交
184 185 186
template <typename EngineCore>
struct ModelData {
  ModelData() : current_idx(1) {
T
TeslaZhao 已提交
187 188
    cores[0] = nullptr;
    cores[1] = nullptr;
T
TeslaZhao 已提交
189 190
    caches[0] = nullptr;
    caches[1] = nullptr;
W
wangguibao 已提交
191 192 193 194 195
  }

  ~ModelData() {
    delete cores[0];
    delete cores[1];
T
TeslaZhao 已提交
196 197
    delete caches[0];
    delete caches[1];
W
wangguibao 已提交
198 199
  }

T
TeslaZhao 已提交
200 201
  void* get_core() { return cores[current_idx]->get(); }

T
TeslaZhao 已提交
202
  CubeCache* get_cache() { return caches[current_idx]; }
203

W
wangguibao 已提交
204
  EngineCore* cores[2];
T
TeslaZhao 已提交
205
  CubeCache* caches[2];
W
wangguibao 已提交
206 207 208 209 210 211 212 213 214 215 216
  uint32_t current_idx;
};

template <typename EngineCore>
class DBReloadableInferEngine : public ReloadableInferEngine {
 public:
  virtual ~DBReloadableInferEngine() {}

  int proc_initialize(const configure::EngineDesc& conf, bool version) {
    THREAD_KEY_CREATE(&_skey, NULL);
    THREAD_MUTEX_INIT(&_mutex, NULL);
T
TeslaZhao 已提交
217
    _gpu_index = 0;
W
wangguibao 已提交
218 219 220
    return ReloadableInferEngine::proc_initialize(conf, version);
  }

221 222 223 224 225
  // 进程初始化会调用load,但由于未执行线程初始化,所以_reload_vec为空,不再继续执行。
  // 热加载的话会调用load,由于线程已经初始化,_reload_vec不为空,所以继续执行load_data操作加载数据。
  // 线程初始化会执行load_data操作加载数据,然后将engine加入_reload_vec中。
  // 每个模型只有一个CloneDBReloadableInferEngine对象。
  // 但一个CloneDBReloadableInferEngine对象,可以包含N个EngineCore。
Z
update  
zhangjun 已提交
226
  virtual int load(const configure::EngineDesc& conf) {
W
wangguibao 已提交
227 228
    if (_reload_vec.empty()) {
      return 0;
W
wangguibao 已提交
229
    }
T
TeslaZhao 已提交
230
    _gpu_index = 0;
W
wangguibao 已提交
231
    for (uint32_t ti = 0; ti < _reload_vec.size(); ++ti) {
Z
update  
zhangjun 已提交
232
      if (load_data(_reload_vec[ti], conf) != 0) {
W
wangguibao 已提交
233 234 235 236 237
        LOG(ERROR) << "Failed reload engine model: " << ti;
        return -1;
      }
    }

Z
update  
zhangjun 已提交
238
    LOG(WARNING) << "Succ load engine, path: " << conf.model_dir();
S
ShiningZhang 已提交
239
    RequestCache::GetSingleton()->Clear();
W
wangguibao 已提交
240 241
    return 0;
  }
W
wangguibao 已提交
242

243 244
  virtual int load_data(ModelData<EngineCore>* md,
                        const configure::EngineDesc& conf) {
W
wangguibao 已提交
245
    uint32_t next_idx = (md->current_idx + 1) % 2;
T
TeslaZhao 已提交
246 247

    // reload engine core
W
wangguibao 已提交
248 249
    if (md->cores[next_idx]) {
      delete md->cores[next_idx];
W
wangguibao 已提交
250
    }
W
wangguibao 已提交
251
    md->cores[next_idx] = new (std::nothrow) EngineCore;
T
TeslaZhao 已提交
252 253 254 255
    if (nullptr == md->cores[next_idx]) {
      LOG(ERROR) << "Allocating memory failed. ";
      return -1;
    }
256 257 258 259
    size_t gpu_ids_num = conf.gpu_ids_size();
    im::bsf::AutoMutex lock(_mutex);
    int gpu_id = -1;
    if (gpu_ids_num > 0) {
T
TeslaZhao 已提交
260
      gpu_id = conf.gpu_ids(_gpu_index % gpu_ids_num);
261
    }
T
TeslaZhao 已提交
262
    LOG(WARNING) << "Loading EngineCore[" << next_idx << "] ...";
263 264
    if (!md->cores[next_idx] ||
        md->cores[next_idx]->create(conf, gpu_id) != 0) {
Z
update  
zhangjun 已提交
265
      LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
W
wangguibao 已提交
266
      return -1;
W
wangguibao 已提交
267
    }
T
TeslaZhao 已提交
268
    _gpu_index++;
T
TeslaZhao 已提交
269
    LOG(WARNING) << "Loading EngineCore[" << next_idx << "] done.";
T
TeslaZhao 已提交
270 271

    // reload cube cache
T
TeslaZhao 已提交
272 273 274 275 276 277 278 279 280
    if (nullptr == md->caches[next_idx]) {
      md->caches[next_idx] = new (std::nothrow) CubeCache;
    }

    if (nullptr == md->caches[next_idx]) {
      LOG(ERROR) << "Allocating memory failed.";
      return -1;
    }
    LOG(WARNING) << "Loading cube cache[" << next_idx << "] ...";
T
TeslaZhao 已提交
281 282
    std::string model_path = conf.model_dir();
    if (access(model_path.c_str(), F_OK) == 0) {
W
wangjiawei04 已提交
283
      std::string cube_cache_path = model_path + "/cube_cache";
T
TeslaZhao 已提交
284 285
      int reload_cache_ret = md->caches[next_idx]->reload_data(cube_cache_path);
      LOG(WARNING) << "Loading cube cache[" << next_idx << "] done.";
T
TeslaZhao 已提交
286 287 288 289 290
    } else {
      LOG(ERROR) << "model_path " << model_path
                 << " is not exits. Ignore cube cache!";
    }

T
TeslaZhao 已提交
291
    // switch current_idx
W
wangguibao 已提交
292
    md->current_idx = next_idx;
T
TeslaZhao 已提交
293 294 295
    LOG(WARNING)
        << "Reload model and cube cache done. switching to current_idx["
        << next_idx << "]";
W
wangguibao 已提交
296 297
    return 0;
  }
W
wangguibao 已提交
298

W
wangguibao 已提交
299 300
  virtual int thrd_initialize_impl() {
    ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
Z
update  
zhangjun 已提交
301
    if (!md || load_data(md, _conf) != 0) {
302
      LOG(ERROR) << "Failed create thread data from " << _conf.model_dir();
W
wangguibao 已提交
303
      return -1;
W
wangguibao 已提交
304 305
    }

W
wangguibao 已提交
306
    THREAD_SETSPECIFIC(_skey, md);
H
HexToString 已提交
307
    im::bsf::AutoMutex lock(_mutex);
W
wangguibao 已提交
308 309 310 311 312
    _reload_vec.push_back(md);
    return 0;
  }

  int thrd_clear_impl() {
313 314 315 316
    // actually, there are 2 kinds of multi-thread.
    // 1. brpc thread 2. bsf Task thread
    // each request is in 1-single brpc thread.
    // IF (bsf Task thread is not used)
H
HexToString 已提交
317 318 319 320 321 322 323 324 325 326
    // every single brpc thread corresponds to all the DBReloadableInferEngines.
    // each request runs all models in 1-single brpc thread.
    // every single brpc thread will create or clone N predictor.
    // N = the number of Model.
    // so if there are 2 models, and --thread 10.
    // each brpc thread will create predictor of Model-1 and Model-2.
    // there are totally 10 predictors of Model-1 and 10 predictors of Model-2
    // cause there are 10 brpc threads.

    // IF bsf Task thread is used。
327
    // there will be a ThreadPool called bsf TaskExecutor.
H
HexToString 已提交
328 329 330 331 332 333
    // TaskExecutorVector is the vector of TaskExecutor.
    // the number of TaskExecutor equals to the number of Model.
    // 1 TaskExecutor corresponding to 1 Model.
    // 1 TaskExecutor have N bsf threads.
    // 1 bsf thread corresponds to 1 predictor of
    // the Model corresponding to the TaskExecutor.
334 335 336 337 338 339
    // brpc thread only put the data into the task_queue(which is in
    // TaskExecutor)
    // EngineCore->infer() is running in bsf Task thread.

    // MempoolWrapper::instance() is actually a Thread-Local Mempool.
    // so it belongs to a single Thread.
W
wangguibao 已提交
340 341 342 343
    return 0;
  }

  int thrd_finalize_impl() { return 0; }
W
wangguibao 已提交
344

W
wangguibao 已提交
345 346 347 348 349
  int proc_finalize_impl() {
    THREAD_KEY_DELETE(_skey);
    THREAD_MUTEX_DESTROY(&_mutex);
    return 0;
  }
W
wangguibao 已提交
350

W
wangguibao 已提交
351 352 353 354 355 356
  EngineCore* get_core() {
    ModelData<EngineCore>* md =
        (ModelData<EngineCore>*)THREAD_GETSPECIFIC(_skey);
    if (!md) {
      LOG(ERROR) << "Failed get thread specific data";
      return NULL;
W
wangguibao 已提交
357
    }
W
wangguibao 已提交
358 359
    return md->cores[md->current_idx];
  }
W
wangguibao 已提交
360

T
TeslaZhao 已提交
361 362 363 364 365 366 367 368 369 370
  CubeCache* get_cube_cache() {
    ModelData<EngineCore>* md =
        (ModelData<EngineCore>*)THREAD_GETSPECIFIC(_skey);
    if (!md) {
      LOG(ERROR) << "Failed get thread specific data";
      return NULL;
    }
    return md->get_cache();
  }

W
wangguibao 已提交
371 372 373
 protected:
  THREAD_KEY_T _skey;
  THREAD_MUTEX_T _mutex;
T
TeslaZhao 已提交
374 375

  // vector of all model engines
W
wangguibao 已提交
376
  std::vector<ModelData<EngineCore>*> _reload_vec;
T
TeslaZhao 已提交
377 378 379

  // gpu card id
  int _gpu_index = 0;
W
wangguibao 已提交
380
};
W
wangguibao 已提交
381

W
wangguibao 已提交
382 383 384 385 386 387 388
// 多个EngineCore共用同一份模型数据
template <typename EngineCore>
class CloneDBReloadableInferEngine
    : public DBReloadableInferEngine<EngineCore> {
 public:
  virtual ~CloneDBReloadableInferEngine() {}

389 390 391 392 393
  // 进程初始化会调用load,但由于未执行线程初始化,所以_reload_vec为空,不再继续执行。
  // 热加载的话会调用load,由于线程已经初始化,_reload_vec不为空,所以继续执行load_data操作加载数据。
  // 线程初始化会执行load_data操作加载数据,然后将engine加入_reload_vec中。
  // 每个模型只有一个CloneDBReloadableInferEngine对象。
  // 但一个CloneDBReloadableInferEngine对象,可以包含N个EngineCore。
W
wangguibao 已提交
394

395 396
  virtual int load_data(ModelData<EngineCore>* md,
                        const configure::EngineDesc& conf) {
T
TeslaZhao 已提交
397
    int tid = syscall(SYS_gettid);
398 399 400
    uint32_t next_idx = (md->current_idx + 1) % 2;
    if (md->cores[next_idx]) {
      delete md->cores[next_idx];
W
wangguibao 已提交
401
    }
402
    md->cores[next_idx] = new (std::nothrow) EngineCore;
W
wangguibao 已提交
403

T
TeslaZhao 已提交
404 405 406 407 408 409 410
    if (nullptr == md->caches[next_idx]) {
      md->caches[next_idx] = new (std::nothrow) CubeCache;
    }
    if (nullptr == md->cores[next_idx] || nullptr == md->caches[next_idx]) {
      LOG(ERROR) << "Allocating memory fail.";
      return -1;
    }
411
    // params.dump();
H
HexToString 已提交
412 413 414 415 416
    // gpu_ids_num > 0 is always true.
    // if use CPU, gpu_ids = [-1].
    // if gpu_ids_num = 0, which means no gpuid is given.
    // so we should set gpu_ids_num = 1, and gpu_id = -1.
    // so that we can create at least 1 predictor.
417 418 419 420
    size_t gpu_ids_num = conf.gpu_ids_size();
    im::bsf::AutoMutex lock(DBReloadableInferEngine<EngineCore>::_mutex);
    int gpu_id = -1;
    if (gpu_ids_num > 0) {
T
TeslaZhao 已提交
421
      gpu_id = conf.gpu_ids(DBReloadableInferEngine<EngineCore>::_gpu_index %
422
                            gpu_ids_num);
H
HexToString 已提交
423 424
    } else {
      gpu_ids_num = 1;
W
wangguibao 已提交
425
    }
T
TeslaZhao 已提交
426

T
TeslaZhao 已提交
427 428
    // _gpu_index will be set to be 0, when load() or proc_initial() is called.
    // _gpu_index < gpu_ids_num, means there are predictors still not create
H
HexToString 已提交
429
    // on some GPU card.
430
    // so we need to create the predictor.
T
TeslaZhao 已提交
431
    // _gpu_index >= gpu_ids_num, means each GPU card has already create one.
432
    // so we need to clone the predictor.
T
TeslaZhao 已提交
433
    LOG(WARNING) << "tid:" << tid << " Loading clone model ...";
T
TeslaZhao 已提交
434
    if (DBReloadableInferEngine<EngineCore>::_gpu_index < gpu_ids_num) {
T
TeslaZhao 已提交
435 436
      // create cores
      if (md->cores[next_idx]->create(conf, gpu_id) != 0) {
437
        LOG(ERROR) << "Failed create model, path: " << conf.model_dir();
W
wangguibao 已提交
438 439
        return -1;
      }
T
TeslaZhao 已提交
440 441 442
      // create caches
      std::string model_path = conf.model_dir();
      if (access(model_path.c_str(), F_OK) == 0) {
W
wangjiawei04 已提交
443
        std::string cube_cache_path = model_path + "/cube_cache";
T
TeslaZhao 已提交
444 445 446 447 448 449 450 451
        int reload_cache_ret =
            md->caches[next_idx]->reload_data(cube_cache_path);
        LOG(WARNING) << "create cube cache[" << next_idx << "] done.";
      } else {
        LOG(WARNING) << "model_path " << model_path
                     << " is not exits. Ignore cube cache!";
      }

T
TeslaZhao 已提交
452
      DBReloadableInferEngine<EngineCore>::_gpu_index++;
T
TeslaZhao 已提交
453
      // md->current_idx = next_idx;
454
      if (_cloneTemplate.size() <
T
TeslaZhao 已提交
455
          DBReloadableInferEngine<EngineCore>::_gpu_index) {
456 457
        _cloneTemplate.push_back(md);
      } else {
T
TeslaZhao 已提交
458 459
        _cloneTemplate[DBReloadableInferEngine<EngineCore>::_gpu_index - 1] =
            md;
460 461
      }
    } else {
T
TeslaZhao 已提交
462
      int template_index = DBReloadableInferEngine<EngineCore>::_gpu_index %
H
HexToString 已提交
463
                           _cloneTemplate.size();
T
TeslaZhao 已提交
464 465 466

      // clone cores
      if (md->cores[next_idx]->clone(
T
TeslaZhao 已提交
467
              _cloneTemplate[template_index]->get_core()) != 0) {
468 469 470
        LOG(ERROR) << "Failed clone model from core";
        return -1;
      }
T
TeslaZhao 已提交
471 472 473 474
      // clone caches
      md->caches[next_idx] = _cloneTemplate[template_index]->get_cache();
      LOG(WARNING) << "tid:" << tid << " clone caches done";

T
TeslaZhao 已提交
475
      DBReloadableInferEngine<EngineCore>::_gpu_index++;
W
wangguibao 已提交
476 477
    }

T
TeslaZhao 已提交
478 479 480 481 482 483 484
    // switch current_idx
    md->current_idx = next_idx;
    LOG(WARNING)
        << "[" << tid
        << "] Reload clone model and cube cache done. switching to current_idx["
        << next_idx << "]";

W
wangguibao 已提交
485 486
    return 0;
  }
W
wangguibao 已提交
487

W
wangguibao 已提交
488
 protected:
489
  // 模板EngineCore,如果已创建,则多个线程级EngineCore共用该对象的模型数据
H
HexToString 已提交
490
  std::vector<ModelData<EngineCore>*> _cloneTemplate;
W
wangguibao 已提交
491 492
};

H
HexToString 已提交
493
template <typename EngineCore>
M
bug fix  
MRXLT 已提交
494
#ifdef WITH_TRT
H
HexToString 已提交
495
class FluidInferEngine : public DBReloadableInferEngine<EngineCore> {
M
bug fix  
MRXLT 已提交
496
#else
H
HexToString 已提交
497
class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
M
bug fix  
MRXLT 已提交
498 499
#endif
 public:  // NOLINT
W
wangguibao 已提交
500 501
  FluidInferEngine() {}
  ~FluidInferEngine() {}
H
HexToString 已提交
502
  typedef std::vector<paddle::PaddleTensor> TensorVector;
H
HexToString 已提交
503
  int infer_impl(const void* in, void* out, uint32_t batch_size = -1) {
S
ShiningZhang 已提交
504 505 506
    struct timeval tv;
    gettimeofday(&tv, NULL);
    long start = tv.tv_sec * 1000000 + tv.tv_usec;
H
HexToString 已提交
507 508 509
    // First of all, get the real core acording to the
    // Template parameter <EngineCore>.
    EngineCore* core = DBReloadableInferEngine<EngineCore>::get_core();
W
wangguibao 已提交
510 511 512
    if (!core || !core->get()) {
      LOG(ERROR) << "Failed get fluid core in infer_impl()";
      return -1;
W
wangguibao 已提交
513
    }
H
HexToString 已提交
514 515 516 517 518 519
    // We use the for loop to process the input data.
    // Inside each for loop, use the in[i]->name as inputName and call
    // 'core->GetInputHandle(inputName)' to get the pointer of InputData.
    // Set the lod and shape information of InputData first.
    // Then copy data from cpu to the core.
    const TensorVector* tensorVector_in_pointer =
520 521
        reinterpret_cast<const TensorVector*>(in);
    for (int i = 0; i < tensorVector_in_pointer->size(); ++i) {
H
HexToString 已提交
522
      auto lod_tensor_in =
523
          core->GetInputHandle((*tensorVector_in_pointer)[i].name);
H
HexToString 已提交
524 525 526
      lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod);
      lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape);
      void* origin_data = (*tensorVector_in_pointer)[i].data.data();
H
HexToString 已提交
527 528 529 530
      // Because the core needs to determine the size of memory space
      // according to the data type passed in.
      // The pointer type of data must be one of
      // float *,int64_t*,int32_t* instead void*.
H
HexToString 已提交
531
      if ((*tensorVector_in_pointer)[i].dtype == paddle::PaddleDType::FLOAT32) {
H
HexToString 已提交
532
        float* data = static_cast<float*>(origin_data);
H
HexToString 已提交
533
        lod_tensor_in->CopyFromCpu(data);
H
HexToString 已提交
534
      } else if ((*tensorVector_in_pointer)[i].dtype ==
535
                 paddle::PaddleDType::INT64) {
H
HexToString 已提交
536
        int64_t* data = static_cast<int64_t*>(origin_data);
H
HexToString 已提交
537
        lod_tensor_in->CopyFromCpu(data);
H
HexToString 已提交
538
      } else if ((*tensorVector_in_pointer)[i].dtype ==
539
                 paddle::PaddleDType::INT32) {
H
HexToString 已提交
540
        int32_t* data = static_cast<int32_t*>(origin_data);
H
HexToString 已提交
541
        lod_tensor_in->CopyFromCpu(data);
S
ShiningZhang 已提交
542 543 544 545 546 547 548 549
      } else if ((*tensorVector_in_pointer)[i].dtype ==
                 paddle::PaddleDType::UINT8) {
        uint8_t* data = static_cast<uint8_t*>(origin_data);
        lod_tensor_in->CopyFromCpu(data);
      } else if ((*tensorVector_in_pointer)[i].dtype ==
                 paddle::PaddleDType::INT8) {
        int8_t* data = static_cast<int8_t*>(origin_data);
        lod_tensor_in->CopyFromCpu(data);
S
ShiningZhang 已提交
550
      } else if ((*tensorVector_in_pointer)[i].dtype ==
T
Thomas Young 已提交
551
               paddle::PaddleDType::FLOAT16) {
S
ShiningZhang 已提交
552 553 554
        paddle::platform::float16* data =
            static_cast<paddle::platform::float16*>(origin_data);
        lod_tensor_in->CopyFromCpu(data);
S
ShiningZhang 已提交
555 556
      } else {
        LOG(ERROR) << "Inference not support type["
557 558 559
                   << (*tensorVector_in_pointer)[i].dtype << "],name["
                   << (*tensorVector_in_pointer)[i].name << "]"
                   << " copy into core failed!";
H
HexToString 已提交
560
      }
S
ShiningZhang 已提交
561 562 563
      VLOG(2) << "Tensor:name=" << (*tensorVector_in_pointer)[i].name
              << ";in_dtype=" << (*tensorVector_in_pointer)[i].dtype
              << ";tensor_dtype=" << lod_tensor_in->type();
W
wangjiawei04 已提交
564
    }
H
HexToString 已提交
565 566
    // After the input data is passed in,
    // call 'core->Run()' perform the prediction process.
W
wangjiawei04 已提交
567
    if (!core->Run()) {
568 569
      LOG(ERROR) << "Failed run fluid family core";
      return -1;
W
wangjiawei04 已提交
570
    }
H
HexToString 已提交
571 572 573 574
    // In order to get the results,
    // first, call the 'core->GetOutputNames()' to get the name of output
    // (which is a dict like {OutputName:pointer of OutputValue}).
    // Then, use for-loop to get OutputValue by calling 'core->GetOutputHandle'.
H
HexToString 已提交
575
    std::vector<std::string> outnames = core->GetOutputNames();
H
HexToString 已提交
576
    std::vector<int> output_shape;
H
HexToString 已提交
577 578
    int out_num = 0;
    int dataType = 0;
H
HexToString 已提交
579 580 581
    void* databuf_data = NULL;
    char* databuf_char = NULL;
    size_t databuf_size = 0;
H
HexToString 已提交
582
    TensorVector* tensorVector_out_pointer =
583
        reinterpret_cast<TensorVector*>(out);
H
HexToString 已提交
584
    if (!tensorVector_out_pointer) {
H
HexToString 已提交
585
      LOG(ERROR) << "tensorVector_out_pointer is nullptr,error";
W
wangguibao 已提交
586 587
      return -1;
    }
H
HexToString 已提交
588 589 590 591
    // Get the type and shape information of OutputData first.
    // then copy data to cpu from the core.
    // The pointer type of data_out must be one of
    // float *,int64_t*,int32_t* instead void*.
592
    for (int i = 0; i < outnames.size(); ++i) {
H
HexToString 已提交
593
      auto lod_tensor_out = core->GetOutputHandle(outnames[i]);
H
HexToString 已提交
594
      output_shape = lod_tensor_out->shape();
H
HexToString 已提交
595 596
      out_num = std::accumulate(
          output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
H
HexToString 已提交
597
      dataType = lod_tensor_out->type();
H
HexToString 已提交
598
      if (dataType == paddle::PaddleDType::FLOAT32) {
599
        databuf_size = out_num * sizeof(float);
H
HexToString 已提交
600
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
H
HexToString 已提交
601
        if (!databuf_data) {
602 603
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
H
HexToString 已提交
604 605
        }
        float* data_out = reinterpret_cast<float*>(databuf_data);
H
HexToString 已提交
606
        lod_tensor_out->CopyToCpu(data_out);
H
HexToString 已提交
607
        databuf_char = reinterpret_cast<char*>(data_out);
H
HexToString 已提交
608
      } else if (dataType == paddle::PaddleDType::INT64) {
609
        databuf_size = out_num * sizeof(int64_t);
H
HexToString 已提交
610
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
H
HexToString 已提交
611
        if (!databuf_data) {
612 613
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
H
HexToString 已提交
614
        }
H
HexToString 已提交
615
        int64_t* data_out = reinterpret_cast<int64_t*>(databuf_data);
H
HexToString 已提交
616
        lod_tensor_out->CopyToCpu(data_out);
H
HexToString 已提交
617
        databuf_char = reinterpret_cast<char*>(data_out);
H
HexToString 已提交
618
      } else if (dataType == paddle::PaddleDType::INT32) {
619
        databuf_size = out_num * sizeof(int32_t);
H
HexToString 已提交
620
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
H
HexToString 已提交
621
        if (!databuf_data) {
622 623
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
H
HexToString 已提交
624 625 626 627
        }
        int32_t* data_out = reinterpret_cast<int32_t*>(databuf_data);
        lod_tensor_out->CopyToCpu(data_out);
        databuf_char = reinterpret_cast<char*>(data_out);
S
ShiningZhang 已提交
628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
      } else if (dataType == paddle::PaddleDType::UINT8) {
        databuf_size = out_num * sizeof(uint8_t);
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
        if (!databuf_data) {
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
        }
        uint8_t* data_out = reinterpret_cast<uint8_t*>(databuf_data);
        lod_tensor_out->CopyToCpu(data_out);
        databuf_char = reinterpret_cast<char*>(data_out);
      } else if (dataType == paddle::PaddleDType::INT8) {
        databuf_size = out_num * sizeof(int8_t);
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
        if (!databuf_data) {
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
        }
        int8_t* data_out = reinterpret_cast<int8_t*>(databuf_data);
        lod_tensor_out->CopyToCpu(data_out);
        databuf_char = reinterpret_cast<char*>(data_out);
S
ShiningZhang 已提交
648 649 650 651 652 653 654 655 656 657 658
      } else if (dataType == paddle::PaddleDType::FLOAT16) {
        databuf_size = out_num * sizeof(paddle::platform::float16);
        databuf_data = MempoolWrapper::instance().malloc(databuf_size);
        if (!databuf_data) {
          LOG(ERROR) << "Malloc failed, size: " << databuf_size;
          return -1;
        }
        paddle::platform::float16* data_out =
            reinterpret_cast<paddle::platform::float16*>(databuf_data);
        lod_tensor_out->CopyToCpu(data_out);
        databuf_char = reinterpret_cast<char*>(data_out);
H
HexToString 已提交
659
      }
S
ShiningZhang 已提交
660

H
HexToString 已提交
661 662 663 664 665
      // Because task scheduling requires OPs to use 'Channel'
      // (which is a data structure) to transfer data between OPs.
      // We need to copy the processed data to the 'Channel' for the next OP.
      // In this function, it means we should copy the 'databuf_char' to
      // 'void* out'.(which is also called ‘tensorVector_out_pointer’)
H
HexToString 已提交
666 667 668 669 670
      paddle::PaddleTensor tensor_out;
      tensor_out.name = outnames[i];
      tensor_out.dtype = paddle::PaddleDType(dataType);
      tensor_out.shape.assign(output_shape.begin(), output_shape.end());
      std::vector<std::vector<size_t>> out_lod = lod_tensor_out->lod();
671
      for (int li = 0; li < out_lod.size(); ++li) {
H
HexToString 已提交
672 673 674 675
        std::vector<size_t> lod_element;
        lod_element.assign(out_lod[li].begin(), out_lod[li].end());
        tensor_out.lod.push_back(lod_element);
      }
H
HexToString 已提交
676
      paddle::PaddleBuf paddleBuf(databuf_char, databuf_size);
H
HexToString 已提交
677 678
      tensor_out.data = paddleBuf;
      tensorVector_out_pointer->push_back(tensor_out);
H
HexToString 已提交
679
    }
S
ShiningZhang 已提交
680 681 682 683 684 685 686 687 688 689 690
    gettimeofday(&tv, NULL);
    long end = tv.tv_sec * 1000000 + tv.tv_usec;
    long total_time = end - start;
    if (PrometheusMetric::Enabled()) {
      PrometheusMetricManager::GetGeneralSingleton()
          ->MetricInferenceCount()
          .Increment(1);
      PrometheusMetricManager::GetGeneralSingleton()
          ->MetricInferenceDuration()
          .Increment(total_time);
    }
W
wangguibao 已提交
691 692
    return 0;
  }
H
HexToString 已提交
693

694 695
  int task_infer_impl(const void* in, void* out) {  // NOLINT
    return infer_impl(in, out);
H
HexToString 已提交
696
  }
T
TeslaZhao 已提交
697 698 699 700

  CubeCache* get_cube_cache() {
    return DBReloadableInferEngine<EngineCore>::get_cube_cache();
  }
W
wangguibao 已提交
701 702
};

W
wangguibao 已提交
703 704 705 706 707 708 709
typedef FactoryPool<InferEngine> StaticInferFactory;

class VersionedInferEngine : public InferEngine {
 public:
  VersionedInferEngine() { _versions.clear(); }
  ~VersionedInferEngine() {}

710
  int proc_initialize(const configure::EngineDesc& conf);
W
wangguibao 已提交
711

712
  int proc_initialize(const configure::EngineDesc& conf, bool version);
W
wangguibao 已提交
713

714
  int proc_finalize();
W
wangguibao 已提交
715

716
  int thrd_initialize();
W
wangguibao 已提交
717

718
  int thrd_clear();
W
wangguibao 已提交
719

720
  int thrd_finalize();
W
wangguibao 已提交
721

722
  int reload();
W
wangguibao 已提交
723

724
  uint64_t version() const;
W
wangguibao 已提交
725 726

  // inference interface
727
  InferEngine* default_engine() const;
W
wangguibao 已提交
728

729
  int infer(const void* in, void* out, uint32_t batch_size);
W
wangguibao 已提交
730 731

  template <typename T>
732
  T* get_core();
W
wangguibao 已提交
733

T
TeslaZhao 已提交
734 735
  CubeCache* get_cube_cache();

W
wangguibao 已提交
736
  // versioned inference interface
737
  int infer(const void* in, void* out, uint32_t batch_size, uint64_t version);
W
wangguibao 已提交
738 739

  template <typename T>
740
  T* get_core(const uint64_t version);
W
wangguibao 已提交
741

742
  int proc_initialize_impl(const configure::EngineDesc& conf, bool);
W
wangguibao 已提交
743

744 745 746 747 748 749 750 751 752 753
  int thrd_initialize_impl();

  int thrd_finalize_impl();

  int thrd_clear_impl();

  int proc_finalize_impl();

  int infer_impl(const void* in, void* out, uint32_t batch_size = -1);

754
  int task_infer_impl(const void* in, void* out);
W
wangguibao 已提交
755 756 757

 private:
  boost::unordered_map<uint64_t, InferEngine*> _versions;
W
wangguibao 已提交
758 759
};

W
wangguibao 已提交
760 761 762 763 764 765 766
class InferManager {
 public:
  static InferManager& instance() {
    static InferManager ins;
    return ins;
  }

H
HexToString 已提交
767 768 769
  int proc_initialize(const char* path,
                      const char* file,
                      std::shared_ptr<int> engine_index_ptr);
W
wangguibao 已提交
770

H
HexToString 已提交
771 772
  int set_taskexecutor_num(size_t total_engine_num);

773
  int thrd_initialize();
W
wangguibao 已提交
774

775
  int thrd_clear();
W
wangguibao 已提交
776

777
  int reload();
W
wangguibao 已提交
778

779
  int thrd_finalize();
W
wangguibao 已提交
780

781
  int proc_finalize();
W
wangguibao 已提交
782 783

  // Inference interface
H
HexToString 已提交
784 785 786
  int infer(const char* model_name,
            const void* in,
            void* out,
787
            uint32_t batch_size = -1);
W
wangguibao 已提交
788

T
TeslaZhao 已提交
789
  // get engine core
W
wangguibao 已提交
790
  template <typename T>
791
  T* get_core(const char* model_name);
W
wangguibao 已提交
792

T
TeslaZhao 已提交
793 794 795
  // get cube cache
  CubeCache* get_cube_cache(const char* model_name);

W
wangguibao 已提交
796
  // Versioned inference interface
H
HexToString 已提交
797
  int infer(const char* model_name,
H
HexToString 已提交
798 799 800
            const void* in,
            void* out,
            uint32_t batch_size,
801
            uint64_t version);
W
wangguibao 已提交
802

T
TeslaZhao 已提交
803
  // Versioned get engine core
W
wangguibao 已提交
804
  template <typename T>
805
  T* get_core(const char* model_name, const uint64_t version);
W
wangguibao 已提交
806

T
TeslaZhao 已提交
807
  // query model version
808
  int query_version(const std::string& model, uint64_t& version);
W
wangguibao 已提交
809 810 811 812

 private:
  boost::unordered_map<std::string, VersionedInferEngine*> _map;
};
W
wangguibao 已提交
813

W
wangguibao 已提交
814 815 816
}  // namespace predictor
}  // namespace paddle_serving
}  // namespace baidu