bsf-inl.h 14.4 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wangguibao 已提交
15 16
#pragma once

W
wangguibao 已提交
17 18 19
#ifdef BCLOUD
#include <base/atomicops.h>
#else
W
wangguibao 已提交
20
#include <butil/atomicops.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23 24
#include <sys/syscall.h>
#include <boost/bind.hpp>
W
wangguibao 已提交
25

G
guru4elephant 已提交
26
#include "core/predictor/common/inner_common.h"
27
#include "core/predictor/framework/memory.h"
W
wangguibao 已提交
28

H
HexToString 已提交
29
// this file is included by bsf.h
W
wangguibao 已提交
30 31 32
namespace im {
namespace bsf {

H
HexToString 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& baskTask) {
  // 双检锁,减少加锁的粒度
  if (!fetch_init) {
    if (taskmeta_num > 1) {
      // 对于task被拆分为多个taskmeta,需要加锁。
      AutoMutex lock(task_mut);
      task_fetch_create(baskTask);
    } else {
      // 对于task只有1个taskmeta,不需要加锁。
      task_fetch_create(baskTask);
    }
  }
  return true;
}

template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& baskTask) {
  if (!fetch_init) {
    vector_fetch_lod_index = baskTask.vector_fetch_lod_index;
    set_fetch_nobatch_index = baskTask.set_fetch_nobatch_index;
    OutVectorT taskMetaOutLodTensor;
    size_t fetchvar_num = baskTask._batch_out.size();
    for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
         ++fetchvar_index) {
      size_t fetchvar_bytesize_index =
          baskTask.fetchvar_bytesize(fetchvar_index);
      size_t fetchvar_batch = 0;
      // 1. nobatch fetchvar情况
      if (set_fetch_nobatch_index.size() > 0 &&
          set_fetch_nobatch_index.find(fetchvar_index) !=
              set_fetch_nobatch_index.end()) {
        fetchvar_batch = 1;
      } else if (vector_fetch_lod_index.size() > 0 &&
                 std::find(vector_fetch_lod_index.begin(),
                           vector_fetch_lod_index.end(),
                           fetchvar_index) != vector_fetch_lod_index.end()) {
        // lod fetchvar情况,此时无法确定总的shape[0]
        // 根据task中的task_num总数开辟task_num个临时空间
        // 每个lod型的fetchvar拷贝到对应的临时空间中
        // 最后再计算临时空间的总量,合并fetchvar和lod
        fetchvar_batch = 0;

      } else {
        // 普通fetchvar情况,此时该Task总的fetchvar_batch =
        // 输入的总的batch_size()
        fetchvar_batch = batch_size();
      }
      paddle::PaddleTensor tensor_out;
      tensor_out.name = baskTask._batch_out[fetchvar_index].name;
      tensor_out.dtype =
          paddle::PaddleDType(baskTask._batch_out[fetchvar_index].dtype);
      tensor_out.shape = baskTask._batch_out[fetchvar_index].shape;
      tensor_out.shape[0] = fetchvar_batch;
      if (fetchvar_batch != 0) {
        // 此时 lod 为空。
        tensor_out.lod = baskTask._batch_out[fetchvar_index].lod;
        // resize all batch memory at one time
        size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
        tensor_out.data.Resize(databuf_size);
      } else {
        // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
        // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy

        // 当task被分为多个taskMeta时,需要临时对象记录
        // 收齐后再一起合并
        if (taskmeta_num > 1) {
          taskMetaOutLodTensor.push_back(tensor_out);
        }
      }
      outVectorT_ptr->push_back(tensor_out);
    }
    // outLodTensorVector实际是一个双层vector
    // shape为taskmeta_num * vector_fetch_lod_index.size();
    outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
    fetch_init = true;
  }
  return true;
}

W
wangguibao 已提交
113
template <typename TaskT>
W
wangguibao 已提交
114
void* TaskExecutor<TaskT>::thread_entry(void* args) {
W
wangguibao 已提交
115 116 117 118
  ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
  TaskExecutor<TaskT>* executor =
      static_cast<TaskExecutor<TaskT>*>(context->executor);
  executor->work(context);
W
wangguibao 已提交
119

H
HexToString 已提交
120
  return nullptr;
W
wangguibao 已提交
121 122
}

W
wangguibao 已提交
123
template <typename TaskT>
W
wangguibao 已提交
124
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
W
wangguibao 已提交
125 126 127 128 129
  _stop = false;
  if (!_thread_contexts.empty()) {
    LOG(WARNING) << "BSF has started";
    return 0;
  }
W
wangguibao 已提交
130

W
wangguibao 已提交
131 132 133 134
  if (thread_num == 0) {
    LOG(ERROR) << "cannot init BSF with zero thread";
    return -1;
  }
W
wangguibao 已提交
135

W
wangguibao 已提交
136 137 138 139 140
  ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
  for (uint32_t i = 0; i < thread_num; ++i) {
    contexts[i].executor = this;
    if (_user_thread_contexts != NULL) {
      contexts[i].user_thread_context = _user_thread_contexts[i];
W
wangguibao 已提交
141 142
    }

W
wangguibao 已提交
143 144 145 146 147 148 149
    int rc = THREAD_CREATE(
        &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
    if (rc != 0) {
      LOG(ERROR) << "failed to create BSF worker thread: index=" << i
                 << ", rc=" << rc << ", errno=" << errno << ":"
                 << strerror(errno);
      return -1;
W
wangguibao 已提交
150 151
    }

W
wangguibao 已提交
152 153
    _thread_contexts.push_back(&contexts[i]);
  }
W
wangguibao 已提交
154

H
HexToString 已提交
155
  size_t init_timeout = init_timeout_sec * 1000 * 1000;
W
wangguibao 已提交
156
  bool has_error = false;
W
wangguibao 已提交
157

W
wangguibao 已提交
158 159 160 161
  bool has_timeout = true;
  if (init_timeout == 0) {
    has_timeout = false;
  }
W
wangguibao 已提交
162

W
wangguibao 已提交
163 164
  while (!has_timeout || init_timeout > 0) {
    bool done = true;
W
wangguibao 已提交
165
    for (size_t i = 0; i < _thread_contexts.size(); ++i) {
W
wangguibao 已提交
166 167 168 169
      if (_thread_contexts[i]->init_status < 0) {
        has_error = true;
        break;
      }
W
wangguibao 已提交
170

W
wangguibao 已提交
171 172 173
      if (_thread_contexts[i]->init_status == 0) {
        done = false;
      }
W
wangguibao 已提交
174 175
    }

W
wangguibao 已提交
176 177 178
    if (has_error) {
      LOG(ERROR) << "BSF thread init error";
      return -1;
W
wangguibao 已提交
179 180
    }

W
wangguibao 已提交
181 182 183
    if (done) {
      LOG(INFO) << "BSF thread init done";
      return 0;
W
wangguibao 已提交
184 185
    }

W
wangguibao 已提交
186
    // 100ms
H
HexToString 已提交
187
    const size_t sleep_interval = 100 * 1000;
W
wangguibao 已提交
188 189 190
    usleep(sleep_interval);
    init_timeout -= sleep_interval;
  }
W
wangguibao 已提交
191

W
wangguibao 已提交
192 193 194
  LOG(ERROR) << "BSF thread init timed out";
  return -1;
}
W
wangguibao 已提交
195

W
wangguibao 已提交
196 197 198 199 200 201 202 203 204 205 206 207
template <typename TaskT>
void TaskExecutor<TaskT>::stop() {
  _stop = true;
  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_CANCEL(_thread_contexts[i]->tid);
  }

  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_JOIN(_thread_contexts[i]->tid, NULL);
  }
  _thread_contexts.clear();
}
W
wangguibao 已提交
208

W
wangguibao 已提交
209
template <typename TaskT>
210 211 212
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
    const void* inVectorT_ptr,
    void* outVectorT_ptr) {  // NOLINT
W
wangguibao 已提交
213 214 215 216 217 218
  TaskT* task = butil::get_object<TaskT>();
  if (!task) {
    LOG(ERROR) << "Failed get TaskT from object pool";
    return TaskHandler<TaskT>::valid_handle();
  }

219
  /*
H
HexToString 已提交
220
  if (!BatchTasks<TaskT>::check_valid(in, out, _overrun)) {
W
wangguibao 已提交
221 222 223
    LOG(ERROR) << "Invalid input & output";
    return TaskHandler<TaskT>::valid_handle();
  }
224
  */
W
wangguibao 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237

  int fds[2];
  int rc = pipe(fds);
  if (rc != 0) {
    LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
               << strerror(errno);
    return TaskHandler<TaskT>::valid_handle();
  }

  task->read_fd = fds[0];
  task->write_fd = fds[1];
  task->owner_tid = ::syscall(SYS_gettid);

238 239
  task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
  task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
H
HexToString 已提交
240 241 242
  if (!task->task_init()) {
    LOG(ERROR) << "task->init() failed";
  }
243
  task->rem = task->batch_size();
W
wangguibao 已提交
244 245 246 247 248 249
  task->index.store(0, butil::memory_order_relaxed);
  AutoMutex lock(_mut);
  _task_queue.push_back(task);
  THREAD_COND_SIGNAL(&_cond);

  return TaskHandler<TaskT>(*task);
W
wangguibao 已提交
250 251
}

252 253
// this function is accessed by multi thread.
// so AutoMutex at first.
H
HexToString 已提交
254
// so batchTask.append_task is thread safe.
255
// you dont need to add extra lock in append_task()
H
HexToString 已提交
256
// task is already init.
W
wangguibao 已提交
257
template <typename TaskT>
258
bool TaskExecutor<TaskT>::move_task_to_batch(
H
HexToString 已提交
259
    BatchTasks<TaskT>& batchTask) {  // NOLINT
W
wangguibao 已提交
260 261 262 263
  AutoMutex lock(_mut);
  while (_task_queue.empty()) {
    THREAD_COND_WAIT(&_cond, &_mut);
  }
W
wangguibao 已提交
264

W
wangguibao 已提交
265 266 267 268
  if (_task_queue.empty()) {
    LOG(ERROR) << "invalid task queue!";
    return false;
  }
W
wangguibao 已提交
269

H
HexToString 已提交
270
  TaskT* previous_task = nullptr;
W
wangguibao 已提交
271 272
  while (!_task_queue.empty()) {
    TaskT* task = _task_queue.front();
H
HexToString 已提交
273

H
HexToString 已提交
274 275 276
    // 由于无法确定fetchVar是否为lod(即使输入是非lod,输出也可能是lod)
    // 简单的处理方法是:task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
    // 只需要设置engine的属性allow_split_request = false即可。
H
HexToString 已提交
277

H
HexToString 已提交
278 279
    // 复杂的处理方法是允许拆分Task,无论是否包含lod.
    // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
H
HexToString 已提交
280 281
    // 所以,task中先要创建taskmeta_num* fetchvar
    // num(lod类型的)个临时PaddleTensor(存储data及Lod)
H
HexToString 已提交
282 283 284
    // 由于多线程调度的单位是taskmeta,故只能在notify_task中,用taskmeta->task去创建
    // 此时由于多个taskmeta对应一个task,存在多线程竞争,所以需要在task中加锁。
    // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleTensor后才能继续。
H
HexToString 已提交
285 286
    // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。

H
HexToString 已提交
287 288 289 290 291 292
    // _overrun表示,异步BatchTasks是否允许单次临时超过限制。
    // _overrun为true时,即使BatchTasks剩下1-batch,也会全放入一个完整的Task,允许临时超限。
    // _overrun为false时,不允许。
    // 对于模型本身有最大Batch限制的情况,应将该值设为false,默认为false。
    // 对于模型本身无最大Batch限制,但自己设置了BatchTasks的最大Batch,可以考虑设置为True。

H
HexToString 已提交
293 294 295 296
    // _allow_split_request ==
    // true,则允许拆分task.BatchTasks剩下1-batch,则会从下一个Task中拆出1-Batch
    // _allow_split_request ==
    // false,则每个task不会被拆分。BatchTasks剩下1-batch会被浪费
H
HexToString 已提交
297 298 299
    // 默认为true,允许拆分task从而使得空间利用率最大。
    if (!batchTask.get_allow_split_request()) {
      if (task->batch_size() > batchTask.get_rem_size() &&
H
HexToString 已提交
300
          !batchTask.get_overrun()) {
H
HexToString 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313
        break;
      }
    }

    // combine_task_valid负责判断是否能够合并
    // 除最外层的shape外,内层shape应一致才能合并。
    // 否则跳出循环,放入下一个batchTask中。
    // 以此保证batch.append_task(task)中的task的内层shape相同。

    // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
    // 所以要求该feedvar必须相等,才能合并。
    // 否则跳出循环,放入下一个batchTask中。
    // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
H
HexToString 已提交
314
    // TODO(HexToString): 可以考虑后期支持AutoPadding.
H
HexToString 已提交
315 316 317 318 319 320 321
    if (previous_task != nullptr) {
      if (!task->combine_task_valid(previous_task)) {
        break;
      }
    }
    size_t rem = batchTask.append_task(task);
    previous_task = task;
W
wangguibao 已提交
322 323
    if (task->rem <= 0) {
      _task_queue.pop_front();
W
wangguibao 已提交
324
    }
W
wangguibao 已提交
325 326
    if (rem <= 0) break;
  }
W
wangguibao 已提交
327

W
wangguibao 已提交
328
  return true;
W
wangguibao 已提交
329 330
}

331 332 333 334 335 336
// this function is accessed by multi thread.
// move_task_to_batch have add lock inside the function.
// Packaging 1 TaskT as 1 or Several TaskMeta.
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
H
HexToString 已提交
337
// put TaskMeta to the local variable BatchTasks<TaskT> batchTask.
338

H
HexToString 已提交
339 340 341 342
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
343 344 345 346 347 348
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
// But different TaskMeta may points to the same TaskT
// which is get from the SingleTon TaskExecutor`s _task_queue.

W
wangguibao 已提交
349
template <typename TaskT>
W
wangguibao 已提交
350
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
351 352 353 354 355
  if (MempoolWrapper::instance().thread_initialize() != 0) {
    LOG(ERROR) << "Failed thread initialize mempool";
    return -1;
  }

W
wangguibao 已提交
356 357 358 359 360 361 362
  if (_thread_init_fn != NULL) {
    if (_thread_init_fn(context->user_thread_context) != 0) {
      LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
      context->init_status = -1;
      return -1;
    } else {
      LOG(INFO) << "execute thread init thunk succeed";
W
wangguibao 已提交
363
    }
W
wangguibao 已提交
364
  }
W
wangguibao 已提交
365

W
wangguibao 已提交
366 367 368 369 370 371
  context->init_status = 1;
  while (!_stop) {
    if (_thread_reset_fn != NULL) {
      if (_thread_reset_fn(context->user_thread_context) != 0) {
        LOG(ERROR) << "execute user thread reset failed";
      }
W
wangguibao 已提交
372 373
    }

374 375 376 377 378
    if (MempoolWrapper::instance().thread_clear() != 0) {
      LOG(ERROR) << "Failed thread clear mempool";
      return -1;
    }

H
HexToString 已提交
379 380 381
    // move_task_to_batch() take the original task from the `_task_queue`
    // put the original task into its own Vector<taskmeta>
    // the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
H
HexToString 已提交
382
    // `_overrun`
H
HexToString 已提交
383 384 385 386 387 388 389 390

    // merge_tasks() move the imput-data into `_batch_in` from its own
    // Vector<taskmeta>.
    // because the predictor`s input is the `_batch_in`

    // notify_tasks() move the output-data into every single taskmeta from
    // `_batch_out`.
    // because the predictor`s output is the `_batch_out`
H
HexToString 已提交
391
    BatchTasks<TaskT> batchTask(_batch_size, _overrun, _allow_split_request);
H
HexToString 已提交
392 393 394 395
    if (move_task_to_batch(batchTask)) {
      batchTask.merge_tasks();
      _fn(&batchTask.in(), &batchTask.out());
      batchTask.notify_tasks();
W
wangguibao 已提交
396 397 398 399
    }
  }

  return 0;
W
wangguibao 已提交
400 401
}

W
wangguibao 已提交
402
template <typename InItemT, typename OutItemT>
403 404
bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
                                              void* out) {  // NOLINT
H
HexToString 已提交
405 406
  TaskHandler<TaskT> handler =
      TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out);
W
wangguibao 已提交
407

W
wangguibao 已提交
408 409 410 411 412 413 414
  if (handler.valid()) {
    _task_owned = handler;
    return true;
  } else {
    LOG(ERROR) << "failed to schedule task";
    return false;
  }
W
wangguibao 已提交
415 416
}

W
wangguibao 已提交
417
template <typename InItemT, typename OutItemT>
W
wangguibao 已提交
418
void TaskManager<InItemT, OutItemT>::wait() {
W
wangguibao 已提交
419 420 421 422
  char buffer[128];
  while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
         errno == EINTR) {
  }
W
wangguibao 已提交
423

W
wangguibao 已提交
424 425
  close(_task_owned.read_fd);
  close(_task_owned.write_fd);
W
wangguibao 已提交
426

W
wangguibao 已提交
427 428 429
  _task_owned.read_fd = -1;
  _task_owned.write_fd = -1;
  return;
W
wangguibao 已提交
430
}
W
wangguibao 已提交
431 432
}  // namespace bsf
}  // namespace im