bsf-inl.h 13.6 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wangguibao 已提交
15 16
#pragma once

W
wangguibao 已提交
17 18 19
#ifdef BCLOUD
#include <base/atomicops.h>
#else
W
wangguibao 已提交
20
#include <butil/atomicops.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23 24
#include <sys/syscall.h>
#include <boost/bind.hpp>
W
wangguibao 已提交
25

G
guru4elephant 已提交
26
#include "core/predictor/common/inner_common.h"
27
#include "core/predictor/framework/memory.h"
W
wangguibao 已提交
28

H
HexToString 已提交
29
// this file is included by bsf.h
W
wangguibao 已提交
30 31 32
namespace im {
namespace bsf {

H
HexToString 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_init(BatchTasks<TaskT>& baskTask) {
  // 双检锁,减少加锁的粒度
  if (!fetch_init) {
    if (taskmeta_num > 1) {
      // 对于task被拆分为多个taskmeta,需要加锁。
      AutoMutex lock(task_mut);
      task_fetch_create(baskTask);
    } else {
      // 对于task只有1个taskmeta,不需要加锁。
      task_fetch_create(baskTask);
    }
  }
  return true;
}

template <typename InItemT, typename OutItemT>
bool Task<InItemT, OutItemT>::task_fetch_create(BatchTasks<TaskT>& baskTask) {
  if (!fetch_init) {
    vector_fetch_lod_index = baskTask.vector_fetch_lod_index;
    set_fetch_nobatch_index = baskTask.set_fetch_nobatch_index;
    OutVectorT taskMetaOutLodTensor;
    size_t fetchvar_num = baskTask._batch_out.size();
    for (size_t fetchvar_index = 0; fetchvar_index < fetchvar_num;
         ++fetchvar_index) {
      size_t fetchvar_bytesize_index =
          baskTask.fetchvar_bytesize(fetchvar_index);
      size_t fetchvar_batch = 0;
      // 1. nobatch fetchvar情况
      if (set_fetch_nobatch_index.size() > 0 &&
          set_fetch_nobatch_index.find(fetchvar_index) !=
              set_fetch_nobatch_index.end()) {
        fetchvar_batch = 1;
      } else if (vector_fetch_lod_index.size() > 0 &&
                 std::find(vector_fetch_lod_index.begin(),
                           vector_fetch_lod_index.end(),
                           fetchvar_index) != vector_fetch_lod_index.end()) {
        // lod fetchvar情况,此时无法确定总的shape[0]
        // 根据task中的task_num总数开辟task_num个临时空间
        // 每个lod型的fetchvar拷贝到对应的临时空间中
        // 最后再计算临时空间的总量,合并fetchvar和lod
        fetchvar_batch = 0;

      } else {
        // 普通fetchvar情况,此时该Task总的fetchvar_batch =
        // 输入的总的batch_size()
        fetchvar_batch = batch_size();
      }
      paddle::PaddleTensor tensor_out;
      tensor_out.name = baskTask._batch_out[fetchvar_index].name;
      tensor_out.dtype =
          paddle::PaddleDType(baskTask._batch_out[fetchvar_index].dtype);
      tensor_out.shape = baskTask._batch_out[fetchvar_index].shape;
      tensor_out.shape[0] = fetchvar_batch;
      if (fetchvar_batch != 0) {
        // 此时 lod 为空。
        tensor_out.lod = baskTask._batch_out[fetchvar_index].lod;
        // resize all batch memory at one time
        size_t databuf_size = fetchvar_batch * fetchvar_bytesize_index;
        tensor_out.data.Resize(databuf_size);
      } else {
        // 当taskmeta_num = 1时,由于同时只有一个taskMeta操作task
        // 不涉及线程安全问题,所以此时可以直接由taskMeta->task->resize->copy

        // 当task被分为多个taskMeta时,需要临时对象记录
        // 收齐后再一起合并
        if (taskmeta_num > 1) {
          taskMetaOutLodTensor.push_back(tensor_out);
        }
      }
      outVectorT_ptr->push_back(tensor_out);
    }
    // outLodTensorVector实际是一个双层vector
    // shape为taskmeta_num * vector_fetch_lod_index.size();
    outLodTensorVector.resize(taskmeta_num, taskMetaOutLodTensor);
    fetch_init = true;
  }
  return true;
}

W
wangguibao 已提交
113
template <typename TaskT>
W
wangguibao 已提交
114
void* TaskExecutor<TaskT>::thread_entry(void* args) {
W
wangguibao 已提交
115 116 117 118
  ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
  TaskExecutor<TaskT>* executor =
      static_cast<TaskExecutor<TaskT>*>(context->executor);
  executor->work(context);
W
wangguibao 已提交
119

H
HexToString 已提交
120
  return nullptr;
W
wangguibao 已提交
121 122
}

W
wangguibao 已提交
123
template <typename TaskT>
W
wangguibao 已提交
124
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
W
wangguibao 已提交
125 126 127 128 129
  _stop = false;
  if (!_thread_contexts.empty()) {
    LOG(WARNING) << "BSF has started";
    return 0;
  }
W
wangguibao 已提交
130

W
wangguibao 已提交
131 132 133 134
  if (thread_num == 0) {
    LOG(ERROR) << "cannot init BSF with zero thread";
    return -1;
  }
W
wangguibao 已提交
135

W
wangguibao 已提交
136 137 138 139 140
  ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
  for (uint32_t i = 0; i < thread_num; ++i) {
    contexts[i].executor = this;
    if (_user_thread_contexts != NULL) {
      contexts[i].user_thread_context = _user_thread_contexts[i];
W
wangguibao 已提交
141 142
    }

W
wangguibao 已提交
143 144 145 146 147 148 149
    int rc = THREAD_CREATE(
        &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
    if (rc != 0) {
      LOG(ERROR) << "failed to create BSF worker thread: index=" << i
                 << ", rc=" << rc << ", errno=" << errno << ":"
                 << strerror(errno);
      return -1;
W
wangguibao 已提交
150 151
    }

W
wangguibao 已提交
152 153
    _thread_contexts.push_back(&contexts[i]);
  }
W
wangguibao 已提交
154

H
HexToString 已提交
155
  size_t init_timeout = init_timeout_sec * 1000 * 1000;
W
wangguibao 已提交
156
  bool has_error = false;
W
wangguibao 已提交
157

W
wangguibao 已提交
158 159 160 161
  bool has_timeout = true;
  if (init_timeout == 0) {
    has_timeout = false;
  }
W
wangguibao 已提交
162

W
wangguibao 已提交
163 164
  while (!has_timeout || init_timeout > 0) {
    bool done = true;
W
wangguibao 已提交
165
    for (size_t i = 0; i < _thread_contexts.size(); ++i) {
W
wangguibao 已提交
166 167 168 169
      if (_thread_contexts[i]->init_status < 0) {
        has_error = true;
        break;
      }
W
wangguibao 已提交
170

W
wangguibao 已提交
171 172 173
      if (_thread_contexts[i]->init_status == 0) {
        done = false;
      }
W
wangguibao 已提交
174 175
    }

W
wangguibao 已提交
176 177 178
    if (has_error) {
      LOG(ERROR) << "BSF thread init error";
      return -1;
W
wangguibao 已提交
179 180
    }

W
wangguibao 已提交
181 182 183
    if (done) {
      LOG(INFO) << "BSF thread init done";
      return 0;
W
wangguibao 已提交
184 185
    }

W
wangguibao 已提交
186
    // 100ms
H
HexToString 已提交
187
    const size_t sleep_interval = 100 * 1000;
W
wangguibao 已提交
188 189 190
    usleep(sleep_interval);
    init_timeout -= sleep_interval;
  }
W
wangguibao 已提交
191

W
wangguibao 已提交
192 193 194
  LOG(ERROR) << "BSF thread init timed out";
  return -1;
}
W
wangguibao 已提交
195

W
wangguibao 已提交
196 197 198 199 200 201 202 203 204 205 206 207
template <typename TaskT>
void TaskExecutor<TaskT>::stop() {
  _stop = true;
  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_CANCEL(_thread_contexts[i]->tid);
  }

  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_JOIN(_thread_contexts[i]->tid, NULL);
  }
  _thread_contexts.clear();
}
W
wangguibao 已提交
208

W
wangguibao 已提交
209
template <typename TaskT>
210 211 212
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
    const void* inVectorT_ptr,
    void* outVectorT_ptr) {  // NOLINT
W
wangguibao 已提交
213 214 215 216 217 218
  TaskT* task = butil::get_object<TaskT>();
  if (!task) {
    LOG(ERROR) << "Failed get TaskT from object pool";
    return TaskHandler<TaskT>::valid_handle();
  }

219
  /*
W
wangguibao 已提交
220 221 222 223
  if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
    LOG(ERROR) << "Invalid input & output";
    return TaskHandler<TaskT>::valid_handle();
  }
224
  */
W
wangguibao 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237

  int fds[2];
  int rc = pipe(fds);
  if (rc != 0) {
    LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
               << strerror(errno);
    return TaskHandler<TaskT>::valid_handle();
  }

  task->read_fd = fds[0];
  task->write_fd = fds[1];
  task->owner_tid = ::syscall(SYS_gettid);

238 239
  task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
  task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
H
HexToString 已提交
240 241 242
  if (!task->task_init()) {
    LOG(ERROR) << "task->init() failed";
  }
243
  task->rem = task->batch_size();
W
wangguibao 已提交
244 245 246 247 248 249
  task->index.store(0, butil::memory_order_relaxed);
  AutoMutex lock(_mut);
  _task_queue.push_back(task);
  THREAD_COND_SIGNAL(&_cond);

  return TaskHandler<TaskT>(*task);
W
wangguibao 已提交
250 251
}

252 253
// this function is accessed by multi thread.
// so AutoMutex at first.
H
HexToString 已提交
254
// so batchTask.append_task is thread safe.
255
// you dont need to add extra lock in append_task()
H
HexToString 已提交
256
// task is already init.
W
wangguibao 已提交
257
template <typename TaskT>
258
bool TaskExecutor<TaskT>::move_task_to_batch(
H
HexToString 已提交
259
    BatchTasks<TaskT>& batchTask) {  // NOLINT
W
wangguibao 已提交
260 261 262 263
  AutoMutex lock(_mut);
  while (_task_queue.empty()) {
    THREAD_COND_WAIT(&_cond, &_mut);
  }
W
wangguibao 已提交
264

W
wangguibao 已提交
265 266 267 268
  if (_task_queue.empty()) {
    LOG(ERROR) << "invalid task queue!";
    return false;
  }
W
wangguibao 已提交
269

H
HexToString 已提交
270
  TaskT* previous_task = nullptr;
W
wangguibao 已提交
271 272
  while (!_task_queue.empty()) {
    TaskT* task = _task_queue.front();
H
HexToString 已提交
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308

    // 由于无法确定fetchVar是否为lod,故单个task不能拆分放到多个batchTask中,否则后续组装很难完成。
    // 所以,task不能被拆分,即用户的请求可以合并一起预测,但不能拆分两个小部分去预测。
    // 难点:预测前,能够知道被拆成了几个taskmeta,但只有预测后,才知道有多少个fetchvar,多少个lod的fetchvar
    // 所以,task中想要创建taskmeta_num* lod的fetchvar num* PaddleBuf(以及Lod)
    // 只能在notify_task中,用taskmeta->task去创建,需要在task中加锁。
    // 原子操作不可行,因为多个线程必须等待创建好上述的PaddleBuf后才能继续。

    // 对于普通的fetch,也需要加锁去创建PaddleTensor,后续才能往里拷贝。

    // _batch_align为false时,即使空间小,也会全放入一个完整的Task,允许临时超限。
    // _allow_split_request == false,则每个task不会被拆分。
    // 默认为true,允许拆分task从而使得空间利用率最大。
    if (!batchTask.get_allow_split_request()) {
      if (task->batch_size() > batchTask.get_rem_size() &&
          batchTask.get_batch_align()) {
        break;
      }
    }

    // combine_task_valid负责判断是否能够合并
    // 除最外层的shape外,内层shape应一致才能合并。
    // 否则跳出循环,放入下一个batchTask中。
    // 以此保证batch.append_task(task)中的task的内层shape相同。

    // 对于Shape[0] = 1 而!=batch的情况,因为合并时,取其中一个的值
    // 所以要求该feedvar必须相等,才能合并。
    // 否则跳出循环,放入下一个batchTask中。
    // 目前没有PaddleTensor和PaddleBuff没有重载==,所以只能比较内存.
    if (previous_task != nullptr) {
      if (!task->combine_task_valid(previous_task)) {
        break;
      }
    }
    size_t rem = batchTask.append_task(task);
    previous_task = task;
W
wangguibao 已提交
309 310
    if (task->rem <= 0) {
      _task_queue.pop_front();
W
wangguibao 已提交
311
    }
W
wangguibao 已提交
312 313
    if (rem <= 0) break;
  }
W
wangguibao 已提交
314

W
wangguibao 已提交
315
  return true;
W
wangguibao 已提交
316 317
}

318 319 320 321 322 323
// this function is accessed by multi thread.
// move_task_to_batch have add lock inside the function.
// Packaging 1 TaskT as 1 or Several TaskMeta.
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
H
HexToString 已提交
324
// put TaskMeta to the local variable BatchTasks<TaskT> batchTask.
325

H
HexToString 已提交
326 327 328 329
// batchTask.merge_tasks() and batchTask.notify_tasks() has no lock.
// BatchTasks<TaskT> batchTask itself is a local variable, it`s thread safe.
// If batchTask.merge_tasks() and batchTask.notify_tasks() do something to
// TaskMeta
330 331 332 333 334 335
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
// But different TaskMeta may points to the same TaskT
// which is get from the SingleTon TaskExecutor`s _task_queue.

W
wangguibao 已提交
336
template <typename TaskT>
W
wangguibao 已提交
337
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
338 339 340 341 342
  if (MempoolWrapper::instance().thread_initialize() != 0) {
    LOG(ERROR) << "Failed thread initialize mempool";
    return -1;
  }

W
wangguibao 已提交
343 344 345 346 347 348 349
  if (_thread_init_fn != NULL) {
    if (_thread_init_fn(context->user_thread_context) != 0) {
      LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
      context->init_status = -1;
      return -1;
    } else {
      LOG(INFO) << "execute thread init thunk succeed";
W
wangguibao 已提交
350
    }
W
wangguibao 已提交
351
  }
W
wangguibao 已提交
352

W
wangguibao 已提交
353 354 355 356 357 358
  context->init_status = 1;
  while (!_stop) {
    if (_thread_reset_fn != NULL) {
      if (_thread_reset_fn(context->user_thread_context) != 0) {
        LOG(ERROR) << "execute user thread reset failed";
      }
W
wangguibao 已提交
359 360
    }

361 362 363 364 365
    if (MempoolWrapper::instance().thread_clear() != 0) {
      LOG(ERROR) << "Failed thread clear mempool";
      return -1;
    }

H
HexToString 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    // move_task_to_batch() take the original task from the `_task_queue`
    // put the original task into its own Vector<taskmeta>
    // the capacity of its own Vector<taskmeta> is decided by `_batch_size` or
    // `_batch_align`

    // merge_tasks() move the imput-data into `_batch_in` from its own
    // Vector<taskmeta>.
    // because the predictor`s input is the `_batch_in`

    // notify_tasks() move the output-data into every single taskmeta from
    // `_batch_out`.
    // because the predictor`s output is the `_batch_out`
    BatchTasks<TaskT> batchTask(
        _batch_size, _batch_align, _allow_split_request);
    if (move_task_to_batch(batchTask)) {
      batchTask.merge_tasks();
      _fn(&batchTask.in(), &batchTask.out());
      batchTask.notify_tasks();
W
wangguibao 已提交
384 385 386 387
    }
  }

  return 0;
W
wangguibao 已提交
388 389
}

W
wangguibao 已提交
390
template <typename InItemT, typename OutItemT>
391 392
bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
                                              void* out) {  // NOLINT
H
HexToString 已提交
393 394
  TaskHandler<TaskT> handler =
      TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out);
W
wangguibao 已提交
395

W
wangguibao 已提交
396 397 398 399 400 401 402
  if (handler.valid()) {
    _task_owned = handler;
    return true;
  } else {
    LOG(ERROR) << "failed to schedule task";
    return false;
  }
W
wangguibao 已提交
403 404
}

W
wangguibao 已提交
405
template <typename InItemT, typename OutItemT>
W
wangguibao 已提交
406
void TaskManager<InItemT, OutItemT>::wait() {
W
wangguibao 已提交
407 408 409 410
  char buffer[128];
  while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
         errno == EINTR) {
  }
W
wangguibao 已提交
411

W
wangguibao 已提交
412 413
  close(_task_owned.read_fd);
  close(_task_owned.write_fd);
W
wangguibao 已提交
414

W
wangguibao 已提交
415 416 417
  _task_owned.read_fd = -1;
  _task_owned.write_fd = -1;
  return;
W
wangguibao 已提交
418
}
W
wangguibao 已提交
419 420
}  // namespace bsf
}  // namespace im