bsf-inl.h 7.7 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wangguibao 已提交
15 16
#pragma once

W
wangguibao 已提交
17 18 19
#ifdef BCLOUD
#include <base/atomicops.h>
#else
W
wangguibao 已提交
20
#include <butil/atomicops.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23 24
#include <sys/syscall.h>
#include <boost/bind.hpp>
W
wangguibao 已提交
25

G
guru4elephant 已提交
26
#include "core/predictor/common/inner_common.h"
27
#include "core/predictor/framework/memory.h"
W
wangguibao 已提交
28 29 30 31

namespace im {
namespace bsf {

W
wangguibao 已提交
32
template <typename TaskT>
W
wangguibao 已提交
33
void* TaskExecutor<TaskT>::thread_entry(void* args) {
W
wangguibao 已提交
34 35 36 37
  ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
  TaskExecutor<TaskT>* executor =
      static_cast<TaskExecutor<TaskT>*>(context->executor);
  executor->work(context);
W
wangguibao 已提交
38

H
HexToString 已提交
39
  return nullptr;
W
wangguibao 已提交
40 41
}

W
wangguibao 已提交
42
template <typename TaskT>
W
wangguibao 已提交
43
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
W
wangguibao 已提交
44 45 46 47 48
  _stop = false;
  if (!_thread_contexts.empty()) {
    LOG(WARNING) << "BSF has started";
    return 0;
  }
W
wangguibao 已提交
49

W
wangguibao 已提交
50 51 52 53
  if (thread_num == 0) {
    LOG(ERROR) << "cannot init BSF with zero thread";
    return -1;
  }
W
wangguibao 已提交
54

W
wangguibao 已提交
55 56 57 58 59
  ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
  for (uint32_t i = 0; i < thread_num; ++i) {
    contexts[i].executor = this;
    if (_user_thread_contexts != NULL) {
      contexts[i].user_thread_context = _user_thread_contexts[i];
W
wangguibao 已提交
60 61
    }

W
wangguibao 已提交
62 63 64 65 66 67 68
    int rc = THREAD_CREATE(
        &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
    if (rc != 0) {
      LOG(ERROR) << "failed to create BSF worker thread: index=" << i
                 << ", rc=" << rc << ", errno=" << errno << ":"
                 << strerror(errno);
      return -1;
W
wangguibao 已提交
69 70
    }

W
wangguibao 已提交
71 72
    _thread_contexts.push_back(&contexts[i]);
  }
W
wangguibao 已提交
73

H
HexToString 已提交
74
  size_t init_timeout = init_timeout_sec * 1000 * 1000;
W
wangguibao 已提交
75
  bool has_error = false;
W
wangguibao 已提交
76

W
wangguibao 已提交
77 78 79 80
  bool has_timeout = true;
  if (init_timeout == 0) {
    has_timeout = false;
  }
W
wangguibao 已提交
81

W
wangguibao 已提交
82 83
  while (!has_timeout || init_timeout > 0) {
    bool done = true;
W
wangguibao 已提交
84
    for (size_t i = 0; i < _thread_contexts.size(); ++i) {
W
wangguibao 已提交
85 86 87 88
      if (_thread_contexts[i]->init_status < 0) {
        has_error = true;
        break;
      }
W
wangguibao 已提交
89

W
wangguibao 已提交
90 91 92
      if (_thread_contexts[i]->init_status == 0) {
        done = false;
      }
W
wangguibao 已提交
93 94
    }

W
wangguibao 已提交
95 96 97
    if (has_error) {
      LOG(ERROR) << "BSF thread init error";
      return -1;
W
wangguibao 已提交
98 99
    }

W
wangguibao 已提交
100 101 102
    if (done) {
      LOG(INFO) << "BSF thread init done";
      return 0;
W
wangguibao 已提交
103 104
    }

W
wangguibao 已提交
105
    // 100ms
H
HexToString 已提交
106
    const size_t sleep_interval = 100 * 1000;
W
wangguibao 已提交
107 108 109
    usleep(sleep_interval);
    init_timeout -= sleep_interval;
  }
W
wangguibao 已提交
110

W
wangguibao 已提交
111 112 113
  LOG(ERROR) << "BSF thread init timed out";
  return -1;
}
W
wangguibao 已提交
114

W
wangguibao 已提交
115 116 117 118 119 120 121 122 123 124 125 126
template <typename TaskT>
void TaskExecutor<TaskT>::stop() {
  _stop = true;
  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_CANCEL(_thread_contexts[i]->tid);
  }

  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_JOIN(_thread_contexts[i]->tid, NULL);
  }
  _thread_contexts.clear();
}
W
wangguibao 已提交
127

W
wangguibao 已提交
128
template <typename TaskT>
129 130 131
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(
    const void* inVectorT_ptr,
    void* outVectorT_ptr) {  // NOLINT
W
wangguibao 已提交
132 133 134 135 136 137
  TaskT* task = butil::get_object<TaskT>();
  if (!task) {
    LOG(ERROR) << "Failed get TaskT from object pool";
    return TaskHandler<TaskT>::valid_handle();
  }

138
  /*
W
wangguibao 已提交
139 140 141 142
  if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
    LOG(ERROR) << "Invalid input & output";
    return TaskHandler<TaskT>::valid_handle();
  }
143
  */
W
wangguibao 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156

  int fds[2];
  int rc = pipe(fds);
  if (rc != 0) {
    LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
               << strerror(errno);
    return TaskHandler<TaskT>::valid_handle();
  }

  task->read_fd = fds[0];
  task->write_fd = fds[1];
  task->owner_tid = ::syscall(SYS_gettid);

157 158 159
  task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr;
  task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr;
  task->rem = task->batch_size();
W
wangguibao 已提交
160 161 162 163 164 165 166
  task->index.store(0, butil::memory_order_relaxed);

  AutoMutex lock(_mut);
  _task_queue.push_back(task);
  THREAD_COND_SIGNAL(&_cond);

  return TaskHandler<TaskT>(*task);
W
wangguibao 已提交
167 168
}

169 170 171 172
// this function is accessed by multi thread.
// so AutoMutex at first.
// so batch.append_task is thread safe.
// you dont need to add extra lock in append_task()
W
wangguibao 已提交
173
template <typename TaskT>
174 175
bool TaskExecutor<TaskT>::move_task_to_batch(
    BatchTasks<TaskT>& batch) {  // NOLINT
W
wangguibao 已提交
176 177 178 179
  AutoMutex lock(_mut);
  while (_task_queue.empty()) {
    THREAD_COND_WAIT(&_cond, &_mut);
  }
W
wangguibao 已提交
180

W
wangguibao 已提交
181 182 183 184
  if (_task_queue.empty()) {
    LOG(ERROR) << "invalid task queue!";
    return false;
  }
W
wangguibao 已提交
185

W
wangguibao 已提交
186 187 188 189 190
  while (!_task_queue.empty()) {
    TaskT* task = _task_queue.front();
    size_t rem = batch.append_task(task);
    if (task->rem <= 0) {
      _task_queue.pop_front();
W
wangguibao 已提交
191
    }
W
wangguibao 已提交
192 193
    if (rem <= 0) break;
  }
W
wangguibao 已提交
194

W
wangguibao 已提交
195
  return true;
W
wangguibao 已提交
196 197
}

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
// this function is accessed by multi thread.
// move_task_to_batch have add lock inside the function.
// Packaging 1 TaskT as 1 or Several TaskMeta.
// TaskT is from the SingleTon TaskExecutor`s _task_queue
// although TaskMeta is a local variable, but several TaskMeta may points to
// the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue.
// put TaskMeta to the local variable BatchTasks<TaskT> batch.

// batch.merge_tasks() and batch.notify_tasks() has no lock.
// BatchTasks<TaskT> batch itself is a local variable, it`s thread safe.
// If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta
// you need to pay attention to that.
// Multi-Thread deal with different TaskMeta(cause it`s created as local
// variable)
// But different TaskMeta may points to the same TaskT
// which is get from the SingleTon TaskExecutor`s _task_queue.

W
wangguibao 已提交
215
template <typename TaskT>
W
wangguibao 已提交
216
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
217 218 219 220 221
  if (MempoolWrapper::instance().thread_initialize() != 0) {
    LOG(ERROR) << "Failed thread initialize mempool";
    return -1;
  }

W
wangguibao 已提交
222 223 224 225 226 227 228
  if (_thread_init_fn != NULL) {
    if (_thread_init_fn(context->user_thread_context) != 0) {
      LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
      context->init_status = -1;
      return -1;
    } else {
      LOG(INFO) << "execute thread init thunk succeed";
W
wangguibao 已提交
229
    }
W
wangguibao 已提交
230
  }
W
wangguibao 已提交
231

W
wangguibao 已提交
232 233 234 235 236 237
  context->init_status = 1;
  while (!_stop) {
    if (_thread_reset_fn != NULL) {
      if (_thread_reset_fn(context->user_thread_context) != 0) {
        LOG(ERROR) << "execute user thread reset failed";
      }
W
wangguibao 已提交
238 239
    }

240 241 242 243 244
    if (MempoolWrapper::instance().thread_clear() != 0) {
      LOG(ERROR) << "Failed thread clear mempool";
      return -1;
    }

W
wangguibao 已提交
245
    BatchTasks<TaskT> batch(_batch_size, _batch_align);
246
    if (move_task_to_batch(batch)) {
W
wangguibao 已提交
247
      batch.merge_tasks();
248
      _fn(&batch.in(), &batch.out());
W
wangguibao 已提交
249 250 251 252 253
      batch.notify_tasks();
    }
  }

  return 0;
W
wangguibao 已提交
254 255
}

W
wangguibao 已提交
256
template <typename InItemT, typename OutItemT>
257 258
bool TaskManager<InItemT, OutItemT>::schedule(const void* in,
                                              void* out) {  // NOLINT
H
HexToString 已提交
259 260
  TaskHandler<TaskT> handler =
      TaskExecutorVector<TaskT>::instance()[_model_index].schedule(in, out);
W
wangguibao 已提交
261

W
wangguibao 已提交
262 263 264 265 266 267 268
  if (handler.valid()) {
    _task_owned = handler;
    return true;
  } else {
    LOG(ERROR) << "failed to schedule task";
    return false;
  }
W
wangguibao 已提交
269 270
}

W
wangguibao 已提交
271
template <typename InItemT, typename OutItemT>
W
wangguibao 已提交
272
void TaskManager<InItemT, OutItemT>::wait() {
W
wangguibao 已提交
273 274 275 276
  char buffer[128];
  while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
         errno == EINTR) {
  }
W
wangguibao 已提交
277

W
wangguibao 已提交
278 279
  close(_task_owned.read_fd);
  close(_task_owned.write_fd);
W
wangguibao 已提交
280

W
wangguibao 已提交
281 282 283
  _task_owned.read_fd = -1;
  _task_owned.write_fd = -1;
  return;
W
wangguibao 已提交
284
}
W
wangguibao 已提交
285 286
}  // namespace bsf
}  // namespace im