bsf-inl.h 6.3 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

W
wangguibao 已提交
15 16
#pragma once

W
wangguibao 已提交
17 18 19
#ifdef BCLOUD
#include <base/atomicops.h>
#else
W
wangguibao 已提交
20
#include <butil/atomicops.h>
W
wangguibao 已提交
21 22
#endif

W
wangguibao 已提交
23 24
#include <sys/syscall.h>
#include <boost/bind.hpp>
W
wangguibao 已提交
25

G
guru4elephant 已提交
26
#include "core/predictor/common/inner_common.h"
W
wangguibao 已提交
27 28 29 30

namespace im {
namespace bsf {

W
wangguibao 已提交
31
template <typename TaskT>
W
wangguibao 已提交
32
void* TaskExecutor<TaskT>::thread_entry(void* args) {
W
wangguibao 已提交
33 34 35 36
  ThreadContext<TaskT>* context = static_cast<ThreadContext<TaskT>*>(args);
  TaskExecutor<TaskT>* executor =
      static_cast<TaskExecutor<TaskT>*>(context->executor);
  executor->work(context);
W
wangguibao 已提交
37

W
wangguibao 已提交
38
  return NULL;
W
wangguibao 已提交
39 40
}

W
wangguibao 已提交
41
template <typename TaskT>
W
wangguibao 已提交
42
int TaskExecutor<TaskT>::start(uint32_t thread_num, uint32_t init_timeout_sec) {
W
wangguibao 已提交
43 44 45 46 47
  _stop = false;
  if (!_thread_contexts.empty()) {
    LOG(WARNING) << "BSF has started";
    return 0;
  }
W
wangguibao 已提交
48

W
wangguibao 已提交
49 50 51 52
  if (thread_num == 0) {
    LOG(ERROR) << "cannot init BSF with zero thread";
    return -1;
  }
W
wangguibao 已提交
53

W
wangguibao 已提交
54 55 56 57 58
  ThreadContext<TaskT>* contexts = new ThreadContext<TaskT>[thread_num];
  for (uint32_t i = 0; i < thread_num; ++i) {
    contexts[i].executor = this;
    if (_user_thread_contexts != NULL) {
      contexts[i].user_thread_context = _user_thread_contexts[i];
W
wangguibao 已提交
59 60
    }

W
wangguibao 已提交
61 62 63 64 65 66 67
    int rc = THREAD_CREATE(
        &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]);
    if (rc != 0) {
      LOG(ERROR) << "failed to create BSF worker thread: index=" << i
                 << ", rc=" << rc << ", errno=" << errno << ":"
                 << strerror(errno);
      return -1;
W
wangguibao 已提交
68 69
    }

W
wangguibao 已提交
70 71
    _thread_contexts.push_back(&contexts[i]);
  }
W
wangguibao 已提交
72

W
wangguibao 已提交
73 74
  int init_timeout = init_timeout_sec * 1000 * 1000;
  bool has_error = false;
W
wangguibao 已提交
75

W
wangguibao 已提交
76 77 78 79
  bool has_timeout = true;
  if (init_timeout == 0) {
    has_timeout = false;
  }
W
wangguibao 已提交
80

W
wangguibao 已提交
81 82
  while (!has_timeout || init_timeout > 0) {
    bool done = true;
W
wangguibao 已提交
83
    for (size_t i = 0; i < _thread_contexts.size(); ++i) {
W
wangguibao 已提交
84 85 86 87
      if (_thread_contexts[i]->init_status < 0) {
        has_error = true;
        break;
      }
W
wangguibao 已提交
88

W
wangguibao 已提交
89 90 91
      if (_thread_contexts[i]->init_status == 0) {
        done = false;
      }
W
wangguibao 已提交
92 93
    }

W
wangguibao 已提交
94 95 96
    if (has_error) {
      LOG(ERROR) << "BSF thread init error";
      return -1;
W
wangguibao 已提交
97 98
    }

W
wangguibao 已提交
99 100 101
    if (done) {
      LOG(INFO) << "BSF thread init done";
      return 0;
W
wangguibao 已提交
102 103
    }

W
wangguibao 已提交
104 105 106 107 108
    // 100ms
    const int sleep_interval = 100 * 1000;
    usleep(sleep_interval);
    init_timeout -= sleep_interval;
  }
W
wangguibao 已提交
109

W
wangguibao 已提交
110 111 112
  LOG(ERROR) << "BSF thread init timed out";
  return -1;
}
W
wangguibao 已提交
113

W
wangguibao 已提交
114 115 116 117 118 119 120 121 122 123 124 125
template <typename TaskT>
void TaskExecutor<TaskT>::stop() {
  _stop = true;
  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_CANCEL(_thread_contexts[i]->tid);
  }

  for (size_t i = 0; i < _thread_contexts.size(); ++i) {
    THREAD_JOIN(_thread_contexts[i]->tid, NULL);
  }
  _thread_contexts.clear();
}
W
wangguibao 已提交
126

W
wangguibao 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
template <typename TaskT>
TaskHandler<TaskT> TaskExecutor<TaskT>::schedule(const InArrayT& in,
                                                 OutArrayT& out) {  // NOLINT
  TaskT* task = butil::get_object<TaskT>();
  if (!task) {
    LOG(ERROR) << "Failed get TaskT from object pool";
    return TaskHandler<TaskT>::valid_handle();
  }

  if (!BatchTasks<TaskT>::check_valid(in, out, _batch_align)) {
    LOG(ERROR) << "Invalid input & output";
    return TaskHandler<TaskT>::valid_handle();
  }

  int fds[2];
  int rc = pipe(fds);
  if (rc != 0) {
    LOG(ERROR) << "call pipe() failed, errno=" << errno << ":"
               << strerror(errno);
    return TaskHandler<TaskT>::valid_handle();
  }

  task->read_fd = fds[0];
  task->write_fd = fds[1];
  task->owner_tid = ::syscall(SYS_gettid);

  task->in = &in;
  task->out = &out;
  task->rem = in.size();
  task->size = in.size();
  task->index.store(0, butil::memory_order_relaxed);

  AutoMutex lock(_mut);
  _task_queue.push_back(task);
  THREAD_COND_SIGNAL(&_cond);

  return TaskHandler<TaskT>(*task);
W
wangguibao 已提交
164 165
}

W
wangguibao 已提交
166 167 168 169 170 171
template <typename TaskT>
bool TaskExecutor<TaskT>::fetch_batch(BatchTasks<TaskT>& batch) {  // NOLINT
  AutoMutex lock(_mut);
  while (_task_queue.empty()) {
    THREAD_COND_WAIT(&_cond, &_mut);
  }
W
wangguibao 已提交
172

W
wangguibao 已提交
173 174 175 176
  if (_task_queue.empty()) {
    LOG(ERROR) << "invalid task queue!";
    return false;
  }
W
wangguibao 已提交
177

W
wangguibao 已提交
178 179 180 181 182
  while (!_task_queue.empty()) {
    TaskT* task = _task_queue.front();
    size_t rem = batch.append_task(task);
    if (task->rem <= 0) {
      _task_queue.pop_front();
W
wangguibao 已提交
183
    }
W
wangguibao 已提交
184 185
    if (rem <= 0) break;
  }
W
wangguibao 已提交
186

W
wangguibao 已提交
187
  return true;
W
wangguibao 已提交
188 189
}

W
wangguibao 已提交
190
template <typename TaskT>
W
wangguibao 已提交
191
int TaskExecutor<TaskT>::work(ThreadContext<TaskT>* context) {
W
wangguibao 已提交
192 193 194 195 196 197 198
  if (_thread_init_fn != NULL) {
    if (_thread_init_fn(context->user_thread_context) != 0) {
      LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit";
      context->init_status = -1;
      return -1;
    } else {
      LOG(INFO) << "execute thread init thunk succeed";
W
wangguibao 已提交
199
    }
W
wangguibao 已提交
200
  }
W
wangguibao 已提交
201

W
wangguibao 已提交
202 203 204 205 206 207
  context->init_status = 1;
  while (!_stop) {
    if (_thread_reset_fn != NULL) {
      if (_thread_reset_fn(context->user_thread_context) != 0) {
        LOG(ERROR) << "execute user thread reset failed";
      }
W
wangguibao 已提交
208 209
    }

W
wangguibao 已提交
210 211 212 213 214 215 216 217 218
    BatchTasks<TaskT> batch(_batch_size, _batch_align);
    if (fetch_batch(batch)) {
      batch.merge_tasks();
      _fn(batch.in(), batch.out());
      batch.notify_tasks();
    }
  }

  return 0;
W
wangguibao 已提交
219 220
}

W
wangguibao 已提交
221 222 223 224
template <typename InItemT, typename OutItemT>
bool TaskManager<InItemT, OutItemT>::schedule(const InArrayT& in,
                                              OutArrayT& out) {  // NOLINT
  TaskHandler<TaskT> handler = _executor.schedule(in, out);
W
wangguibao 已提交
225

W
wangguibao 已提交
226 227 228 229 230 231 232
  if (handler.valid()) {
    _task_owned = handler;
    return true;
  } else {
    LOG(ERROR) << "failed to schedule task";
    return false;
  }
W
wangguibao 已提交
233 234
}

W
wangguibao 已提交
235
template <typename InItemT, typename OutItemT>
W
wangguibao 已提交
236
void TaskManager<InItemT, OutItemT>::wait() {
W
wangguibao 已提交
237 238 239 240
  char buffer[128];
  while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 &&
         errno == EINTR) {
  }
W
wangguibao 已提交
241

W
wangguibao 已提交
242 243
  close(_task_owned.read_fd);
  close(_task_owned.write_fd);
W
wangguibao 已提交
244

W
wangguibao 已提交
245 246 247
  _task_owned.read_fd = -1;
  _task_owned.write_fd = -1;
  return;
W
wangguibao 已提交
248
}
W
wangguibao 已提交
249 250
}  // namespace bsf
}  // namespace im