// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #include #include #include #include "common/inner_common.h" namespace im { namespace bsf { template void* TaskExecutor::thread_entry(void* args) { ThreadContext* context = static_cast*>(args); TaskExecutor* executor = static_cast*>(context->executor); executor->work(context); return NULL; } template int TaskExecutor::start(uint32_t thread_num, uint32_t init_timeout_sec) { _stop = false; if (!_thread_contexts.empty()) { LOG(WARNING) << "BSF has started"; return 0; } if (thread_num == 0) { LOG(ERROR) << "cannot init BSF with zero thread"; return -1; } ThreadContext* contexts = new ThreadContext[thread_num]; for (uint32_t i = 0; i < thread_num; ++i) { contexts[i].executor = this; if (_user_thread_contexts != NULL) { contexts[i].user_thread_context = _user_thread_contexts[i]; } int rc = THREAD_CREATE( &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]); if (rc != 0) { LOG(ERROR) << "failed to create BSF worker thread: index=" << i << ", rc=" << rc << ", errno=" << errno << ":" << strerror(errno); return -1; } _thread_contexts.push_back(&contexts[i]); } int init_timeout = init_timeout_sec * 1000 * 1000; bool has_error = false; bool has_timeout = true; if (init_timeout == 0) { has_timeout = false; } while (!has_timeout || init_timeout > 0) { bool done = true; for (size_t i = 0; i < _thread_contexts.size(); ++i) { if (_thread_contexts[i]->init_status < 0) { has_error = true; break; } if (_thread_contexts[i]->init_status == 0) { done = false; } } if (has_error) { LOG(ERROR) << "BSF thread init error"; return -1; } if (done) { LOG(INFO) << "BSF thread init done"; return 0; } // 100ms const int sleep_interval = 100 * 1000; usleep(sleep_interval); init_timeout -= sleep_interval; } LOG(ERROR) << "BSF thread init timed out"; return -1; } template void TaskExecutor::stop() { _stop = true; for (size_t i = 0; i < _thread_contexts.size(); ++i) { THREAD_CANCEL(_thread_contexts[i]->tid); } for (size_t i = 0; i < _thread_contexts.size(); ++i) { THREAD_JOIN(_thread_contexts[i]->tid, NULL); } _thread_contexts.clear(); } template TaskHandler TaskExecutor::schedule(const InArrayT& in, OutArrayT& out) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; return TaskHandler::valid_handle(); } if (!BatchTasks::check_valid(in, out, _batch_align)) { LOG(ERROR) << "Invalid input & output"; return TaskHandler::valid_handle(); } int fds[2]; int rc = pipe(fds); if (rc != 0) { LOG(ERROR) << "call pipe() failed, errno=" << errno << ":" << strerror(errno); return TaskHandler::valid_handle(); } task->read_fd = fds[0]; task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); task->in = ∈ task->out = &out; task->rem = in.size(); task->size = in.size(); task->index.store(0, butil::memory_order_relaxed); AutoMutex lock(_mut); _task_queue.push_back(task); THREAD_COND_SIGNAL(&_cond); return TaskHandler(*task); } template bool TaskExecutor::fetch_batch(BatchTasks& batch) { // NOLINT AutoMutex lock(_mut); while (_task_queue.empty()) { THREAD_COND_WAIT(&_cond, &_mut); } if (_task_queue.empty()) { LOG(ERROR) << "invalid task queue!"; return false; } while (!_task_queue.empty()) { TaskT* task = _task_queue.front(); size_t rem = batch.append_task(task); if (task->rem <= 0) { _task_queue.pop_front(); } if (rem <= 0) break; } return true; } template int TaskExecutor::work(ThreadContext* context) { if (_thread_init_fn != NULL) { if (_thread_init_fn(context->user_thread_context) != 0) { LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit"; context->init_status = -1; return -1; } else { LOG(INFO) << "execute thread init thunk succeed"; } } context->init_status = 1; while (!_stop) { if (_thread_reset_fn != NULL) { if (_thread_reset_fn(context->user_thread_context) != 0) { LOG(ERROR) << "execute user thread reset failed"; } } BatchTasks batch(_batch_size, _batch_align); if (fetch_batch(batch)) { batch.merge_tasks(); _fn(batch.in(), batch.out()); batch.notify_tasks(); } } return 0; } template bool TaskManager::schedule(const InArrayT& in, OutArrayT& out) { // NOLINT TaskHandler handler = _executor.schedule(in, out); if (handler.valid()) { _task_owned = handler; return true; } else { LOG(ERROR) << "failed to schedule task"; return false; } } template void TaskManager::wait() { char buffer[128]; while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 && errno == EINTR) { } close(_task_owned.read_fd); close(_task_owned.write_fd); _task_owned.read_fd = -1; _task_owned.write_fd = -1; return; } } // namespace bsf } // namespace im