bsf-inl.h 14.9 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wangguibao 已提交
15 16
#pragma once

W
wangguibao 已提交
17 18 19
#ifdef BCLOUD
#include <base/atomicops.h>
#else
W
wangguibao 已提交
20
#include <butil/atomicops.h>
W
wangguibao 已提交
21
#endif
W
wangguibao 已提交
22 23
#include <sys/syscall.h>
#include <boost/bind.hpp>
G
guru4elephant 已提交
24
#include "core/predictor/common/inner_common.h"
25
#include "core/predictor/framework/memory.h"
W
wangguibao 已提交
26

H
HexToString 已提交
27
// this file is included by bsf.h
W
wangguibao 已提交
28 29 30
namespace im {
namespace bsf {

H
HexToString 已提交
31
template <typename InItemT, typename OutItemT>
H
HexToString 已提交
32
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& batchTask) {
H
HexToString 已提交
33 34
  // 双检锁,减少加锁的粒度
  if (!fetch_init) {
B
bjjwwang 已提交
35
    if (total_taskmeta_num > 1) {
H
HexToString 已提交
36 37
      // 对于task被拆分为多个taskmeta,需要加锁。
      AutoMutex lock(task_mut);
H
HexToString 已提交
38
      task_fetch_create(batchTask);
H
HexToString 已提交
39 40
    } else {
      // 对于task只有1个taskmeta,不需要加锁。
H
HexToString 已提交
41
      task_fetch_create(batchTask);
H
HexToString 已提交
42 43 44 45 46 47
    }
  }
  return true;
}

template <typename InItemT, typename OutItemT>
H
HexToString 已提交
48
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& batchTask) {
H
HexToString 已提交
49
  if (!fetch_init) {
H
HexToString 已提交
50 51
    vector_fetch_lod_index = batchTask.vector_fetch_lod_index;
    set_fetch_nobatch_index = batchTask.set_fetch_nobatch_index;
H
HexToString 已提交
52
    OutVectorT taskMetaOutLodTensor;
H
HexToString 已提交
53
    size_t fetchvar_num = batchTask._batch_out.size();
H
HexToString 已提交
54 55 56
    for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
         ++fetchvar_index) {
      size_t fetchvar_bytesize_index =
H
HexToString 已提交
57
          batchTask.fetchvar_bytesize(fetchvar_index);
H
HexToString 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
      size_t fetchvar_batch = 0;
      // 1. nobatch fetchvar情况
      if (set_fetch_nobatch_index.size() > 0 &&
          set_fetch_nobatch_index.find(fetchvar_index) !=
              set_fetch_nobatch_index.end()) {
        fetchvar_batch = 1;
      } else if (vector_fetch_lod_index.size() > 0 &&
                 std::find(vector_fetch_lod_index.begin(),
                           vector_fetch_lod_index.end(),
                           fetchvar_index) != vector_fetch_lod_index.end()) {
        // lod fetchvar情况,此时无法确定总的shape[0]
        // 根据task中的task_num总数开辟task_num个临时空间
        // 每个lod型的fetchvar拷贝到对应的临时空间中
        // 最后再计算临时空间的总量,合并fetchvar和lod
        fetchvar_batch = 0;
      } else {
        // 普通fetchvar情况,此时该Task总的fetchvar_batch =
        // 输入的总的batch_size()
        fetchvar_batch = batch_size();
      }
      paddle::PaddleTensor tensor_out;
H
HexToString 已提交
79
      tensor_out.name = batchTask._batch_out[fetchvar_index].name;
H
HexToString 已提交
80
      tensor_out.dtype =
H
HexToString 已提交
81 82
          paddle::PaddleDType(batchTask._batch_out[fetchvar_index].dtype);
      tensor_out.shape = batchTask._batch_out[fetchvar_index].shape;
H
HexToString 已提交
83 84 85
      tensor_out.shape[0] = fetchvar_batch;
      if (fetchvar_batch != 0) {
        // 此时 lod 为空。
H
HexToString 已提交
86
        tensor_out.lod = batchTask._batch_out[fetchvar_index].lod;
H
HexToString 已提交
87
        // resize all batch memory at one time
H
HexToString 已提交
88

H
HexToString 已提交
89
        size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
H
HexToString 已提交
90 91 92

        void* databuf_data =
            MempoolWrapper::instance().malloc(databuf_size, memoryPtr);
B
bjjwwang 已提交
93 94
        paddle::PaddleBuf paddleBuf(databuf_data, databuf_size);
        tensor_out.data = paddleBuf;
H
HexToString 已提交
95 96

        // tensor_out.data.Resize(databuf_size);
H
HexToString 已提交
97 98 99 100 101 102
      } else {
        // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
        // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy

        // 当task被分为多个taskMeta时,需要临时对象记录
        // 收齐后再一起合并
B
bjjwwang 已提交
103
        if (total_taskmeta_num > 1) {
H
HexToString 已提交
104 105 106 107 108 109 110
          taskMetaOutLodTensor.push_back(tensor_out);
        }
      }
      outVectorT_ptr->push_back(tensor_out);
    }
    // outLodTensorVector实际是一个双层vector
    // shape为taskmeta_num * vector_fetch_lod_index.size();
B
bjjwwang 已提交
111
    outLodTensorVector.resize(total_taskmeta_num, taskMetaOutLodTensor);
H
HexToString 已提交
112 113 114 115 116
    fetch_init = true;
  }
  return true;
}

W
wangguibao 已提交
117
template <typename TaskT>
W
wangguibao 已提交
118
void* TaskExecutor<TaskT>::thread_entry(void* args) {
W
wangguibao 已提交
119 120 121 122
  ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
  TaskExecutor<TaskT>* executor =
      static_cast<TaskExecutor<TaskT>*>(context->executor);
  executor->work(context);
W
wangguibao 已提交
123

H
HexToString 已提交
124
  return nullptr;
W
wangguibao 已提交
125 126
}

W
wangguibao 已提交
127
template <typename TaskT>
W
wangguibao 已提交
128
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
W
wangguibao 已提交
129 130 131 132 133
  _stop = false;
  if (!_thread_contexts.empty()) {
    LOG(WARNING) << "BSF has started";
    return 0;
  }
W
wangguibao 已提交
134

W
wangguibao 已提交
135 136 137 138
  if (thread_num == 0) {
    LOG(ERROR) << "cannot init BSF with zero thread";
    return -1;
  }
W
wangguibao 已提交
139

W
wangguibao 已提交
140 141 142 143 144
  ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
  for (uint32_t i = 0; i < thread_num; ++i) {
    contexts[i].executor = this;
    if (_user_thread_contexts != NULL) {
      contexts[i].user_thread_context = _user_thread_contexts[i];
W
wangguibao 已提交
145 146
    }

W
wangguibao 已提交
147 148 149 150 151 152 153
    int rc = THREAD_CREATE(
        &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
    if (rc != 0) {
      LOG(ERROR) << "failed to create BSF worker thread: index=" << i
                 << ", rc=" << rc << ", errno=" << errno << ":"
                 << strerror(errno);
      return -1;
W
wangguibao 已提交
154 155
    }

W
wangguibao 已提交
156 157
    _thread_contexts.push_back(&contexts[i]);
  }
W
wangguibao 已提交
158

H
HexToString 已提交
159
  size_t init_timeout = init_timeout_sec * 1000 * 1000;
W
wangguibao 已提交
160
  bool has_error = false;
W
wangguibao 已提交
161

W
wangguibao 已提交
162 163 164 165
  bool has_timeout = true;
  if (init_timeout == 0) {
    has_timeout = false;
  }
W
wangguibao 已提交
166

W
wangguibao 已提交
167 168
  while (!has_timeout || init_timeout > 0) {
    bool done = true;
W
wangguibao 已提交
169
    for (size_t i = 0; i < _thread_contexts.size(); ++i) {
W
wangguibao 已提交
170 171 172 173
      if (_thread_contexts[i]->init_status < 0) {
        has_error = true;
        break;
      }
W
wangguibao 已提交
174

W
wangguibao 已提交
175 176 177
      if (_thread_contexts[i]->init_status == 0) {
        done = false;
      }
W
wangguibao 已提交
178 179
    }

W
wangguibao 已提交
180 181 182
    if (has_error) {
      LOG(ERROR) << "BSF thread init error";
      return -1;
W
wangguibao 已提交
183 184
    }

W
wangguibao 已提交
185 186 187
    if (done) {
      LOG(INFO) << "BSF thread init done";
      return 0;
W
wangguibao 已提交
188 189
    }

W
wangguibao 已提交
190
    // 100ms
H
HexToString 已提交
191
    const size_t sleep_interval = 100 * 1000;
W
wangguibao 已提交
192 193 194
    usleep(sleep_interval);
    init_timeout -= sleep_interval;
  }
W
wangguibao 已提交
195

W
wangguibao 已提交
196 197 198
  LOG(ERROR) << "BSF thread init timed out";
  return -1;
}
W
wangguibao 已提交
199

W
wangguibao 已提交
200 201 202 203 204 205 206 207 208 209 210 211
template <typename TaskT>
void TaskExecutor<TaskT>::stop() {
  _stop = true;
  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_CANCEL(_thread_contexts[i]->tid);
  }

  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_JOIN(_thread_contexts[i]->tid, NULL);
  }
  _thread_contexts.clear();
}
W
wangguibao 已提交
212

W
wangguibao 已提交
213
template <typename TaskT>
T
Thomas Young 已提交
214
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
215
    const void* inVectorT_ptr,
H
HexToString 已提交
216 217
    void* outVectorT_ptr,
    MempoolRegion* memoryPtr) {  // NOLINT
W
wangguibao 已提交
218 219 220
  TaskT* task = butil::get_object<TaskT>();
  if (!task) {
    LOG(ERROR) << "Failed get TaskT from object pool";
T
Thomas Young 已提交
221
    return TaskHandler<TaskT>::valid_handle();
W
wangguibao 已提交
222
  }
H
HexToString 已提交
223
  task->clear();
W
wangguibao 已提交
224

T
Thomas Young 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
  /*
  if (!BatchTasks<TaskT>::check_valid(in, out, _overrun)) {
    LOG(ERROR) << "Invalid input & output";
    return TaskHandler<TaskT>::valid_handle();
  }
  */

  int fds[2];
  int rc = pipe(fds);
  if (rc != 0) {
    LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
               << strerror(errno);
    return TaskHandler<TaskT>::valid_handle();
  }

  task->read_fd = fds[0];
  task->write_fd = fds[1];
W
wangguibao 已提交
242
  task->owner_tid = ::syscall(SYS_gettid);
B
bjjwwang 已提交
243
  task->memoryPtr = memoryPtr;
H
HexToString 已提交
244
  // task->_bspec_key = _bspec_key;
245 246
  task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
  task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
H
HexToString 已提交
247 248 249
  if (!task->task_init()) {
    LOG(ERROR) << "task->init() failed";
  }
250
  task->rem = task->batch_size();
W
wangguibao 已提交
251 252 253 254
  task->index.store(0, butil::memory_order_relaxed);
  AutoMutex lock(_mut);
  _task_queue.push_back(task);
  THREAD_COND_SIGNAL(&_cond);
T
Thomas Young 已提交
255 256

  return TaskHandler<TaskT>(*task);
W
wangguibao 已提交
257 258
}

259 260
// this function is accessed by multi thread.
// so AutoMutex at first.
H
HexToString 已提交
261
// so batchTask.append_task is thread safe.
262
// you dont need to add extra lock in append_task()
H
HexToString 已提交
263
// task is already init.
W
wangguibao 已提交
264
template <typename TaskT>
265
bool TaskExecutor<TaskT>::move_task_to_batch(
H
HexToString 已提交
266
    BatchTasks<TaskT>& batchTask) {  // NOLINT
W
wangguibao 已提交
267 268 269 270
  AutoMutex lock(_mut);
  while (_task_queue.empty()) {
    THREAD_COND_WAIT(&_cond, &_mut);
  }
W
wangguibao 已提交
271

W
wangguibao 已提交
272 273 274 275
  if (_task_queue.empty()) {
    LOG(ERROR) << "invalid task queue!";
    return false;
  }
W
wangguibao 已提交
276

H
HexToString 已提交
277
  TaskT* previous_task = nullptr;
W
wangguibao 已提交
278 279
  while (!_task_queue.empty()) {
    TaskT* task = _task_queue.front();
H
HexToString 已提交
280

H
HexToString 已提交
281 282 283
    // 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
    // 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
    // 只需要设置engine的属性allow_split_request = false即可。
H
HexToString 已提交
284

H
HexToString 已提交
285 286
    // 复杂的处理方法是允许拆分Task,无论是否包含lod.
    // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
H
HexToString 已提交
287 288
    // 所以,task中先要创建taskmeta_num* fetchvar
    // num(lod类型的)个临时PaddleTensor(存储data及Lod)
H
HexToString 已提交
289 290 291
    // 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
    // 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
    // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
H
HexToString 已提交
292 293
    // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。

H
HexToString 已提交
294 295 296 297 298 299
    // _overrun表示,异步BatchTasks是否允许单次临时超过限制。
    // _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
    // _overrun为false时,不允许。
    // 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
    // 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。

H
HexToString 已提交
300 301 302 303
    // _allow_split_request ==
    // true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
    // _allow_split_request ==
    // false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
H
HexToString 已提交
304 305 306
    // 默认为true,允许拆分task从而使得空间利用率最大。
    if (!batchTask.get_allow_split_request()) {
      if (task->batch_size() > batchTask.get_rem_size() &&
H
HexToString 已提交
307
          !batchTask.get_overrun()) {
H
HexToString 已提交
308 309 310 311 312
        break;
      }
    }

    // combine_task_valid负责判断是否能够合并
H
HexToString 已提交
313
    // 除最外层的shape外,内层shape应一致或者允许Padding才能合并。
H
HexToString 已提交
314 315 316 317 318 319 320 321
    // 否则跳出循环,放入下一个batchTask中。
    // 以此保证batch.append_task(task)中的task的内层shape相同。

    // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
    // 所以要求该feedvar必须相等,才能合并。
    // 否则跳出循环,放入下一个batchTask中。
    // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
    if (previous_task != nullptr) {
H
HexToString 已提交
322
      if (task->combine_task_valid(previous_task) == 0) {
H
HexToString 已提交
323 324 325
        break;
      }
    }
H
HexToString 已提交
326 327 328 329

    if (batchTask.padding(task) != 2) {
      break;
    }
H
HexToString 已提交
330 331
    size_t rem = batchTask.append_task(task);
    previous_task = task;
W
wangguibao 已提交
332 333
    if (task->rem <= 0) {
      _task_queue.pop_front();
W
wangguibao 已提交
334
    }
W
wangguibao 已提交
335 336
    if (rem <= 0) break;
  }
H
HexToString 已提交
337 338
  LOG(INFO) << "Number of tasks remaining in _task_queue is"
            << _task_queue.size();
W
wangguibao 已提交
339
  return true;
W
wangguibao 已提交
340 341
}

342 343 344 345 346 347
// this function is accessed by multi thread.
// move_task_to_batch have add lock inside the function.
// Packaging 1 TaskT as 1 or Several TaskMeta.
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
H
HexToString 已提交
348
// put TaskMeta to the local variable BatchTasks<TaskT> batchTask.
349

H
HexToString 已提交
350 351 352 353
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
354 355 356 357 358 359
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
// But different TaskMeta may points to the same TaskT
// which is get from the SingleTon TaskExecutor`s _task_queue.

W
wangguibao 已提交
360
template <typename TaskT>
W
wangguibao 已提交
361
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
362 363 364 365 366
  if (MempoolWrapper::instance().thread_initialize() != 0) {
    LOG(ERROR) << "Failed thread initialize mempool";
    return -1;
  }

W
wangguibao 已提交
367 368 369 370 371 372 373
  if (_thread_init_fn != NULL) {
    if (_thread_init_fn(context->user_thread_context) != 0) {
      LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
      context->init_status = -1;
      return -1;
    } else {
      LOG(INFO) << "execute thread init thunk succeed";
W
wangguibao 已提交
374
    }
W
wangguibao 已提交
375
  }
W
wangguibao 已提交
376

W
wangguibao 已提交
377 378 379 380 381 382
  context->init_status = 1;
  while (!_stop) {
    if (_thread_reset_fn != NULL) {
      if (_thread_reset_fn(context->user_thread_context) != 0) {
        LOG(ERROR) << "execute user thread reset failed";
      }
W
wangguibao 已提交
383 384
    }

385 386 387 388 389
    if (MempoolWrapper::instance().thread_clear() != 0) {
      LOG(ERROR) << "Failed thread clear mempool";
      return -1;
    }

H
HexToString 已提交
390 391 392
    // move_task_to_batch() take the original task from the `_task_queue`
    // put the original task into its own Vector<taskmeta>
    // the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
H
HexToString 已提交
393
    // `_overrun`
H
HexToString 已提交
394 395 396 397 398 399 400 401

    // merge_tasks() move the imput-data into `_batch_in` from its own
    // Vector<taskmeta>.
    // because the predictor`s input is the `_batch_in`

    // notify_tasks() move the output-data into every single taskmeta from
    // `_batch_out`.
    // because the predictor`s output is the `_batch_out`
H
HexToString 已提交
402
    BatchTasks<TaskT> batchTask(_batch_size, _overrun, _allow_split_request);
H
HexToString 已提交
403 404 405 406
    if (move_task_to_batch(batchTask)) {
      batchTask.merge_tasks();
      _fn(&batchTask.in(), &batchTask.out());
      batchTask.notify_tasks();
W
wangguibao 已提交
407 408 409 410
    }
  }

  return 0;
W
wangguibao 已提交
411 412
}

W
wangguibao 已提交
413
template <typename InItemT, typename OutItemT>
H
HexToString 已提交
414 415
bool TaskManager<InItemT, OutItemT>::schedule(
    const void* in, void* out, MempoolRegion* memoryPtr) {  // NOLINT
T
Thomas Young 已提交
416
  TaskHandler<TaskT> handler =
H
HexToString 已提交
417 418
      TaskExecutorVector<TaskT>::instance()[_model_index].schedule(
          in, out, memoryPtr);
T
Thomas Young 已提交
419 420 421

  if (handler.valid()) {
    _task_owned = handler;
W
wangguibao 已提交
422 423 424 425 426
    return true;
  } else {
    LOG(ERROR) << "failed to schedule task";
    return false;
  }
W
wangguibao 已提交
427 428
}

W
wangguibao 已提交
429
template <typename InItemT, typename OutItemT>
W
wangguibao 已提交
430
void TaskManager<InItemT, OutItemT>::wait() {
T
Thomas Young 已提交
431 432 433
  char buffer[128];
  while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
         errno == EINTR) {
W
wangguibao 已提交
434
  }
T
Thomas Young 已提交
435 436 437 438 439 440

  close(_task_owned.read_fd);
  close(_task_owned.write_fd);

  _task_owned.read_fd = -1;
  _task_owned.write_fd = -1;
W
wangguibao 已提交
441
  return;
W
wangguibao 已提交
442
}
W
wangguibao 已提交
443 444
}  // namespace bsf
}  // namespace im