// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #pragma once #ifdef BCLOUD #include #else #include #endif #include #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/memory.h" namespace im { namespace bsf { template void* TaskExecutor::thread_entry(void* args) { ThreadContext* context = static_cast*>(args); TaskExecutor* executor = static_cast*>(context->executor); executor->work(context); return nullptr; } template int TaskExecutor::start(uint32_t thread_num, uint32_t init_timeout_sec) { _stop = false; if (!_thread_contexts.empty()) { LOG(WARNING) << "BSF has started"; return 0; } if (thread_num == 0) { LOG(ERROR) << "cannot init BSF with zero thread"; return -1; } ThreadContext* contexts = new ThreadContext[thread_num]; for (uint32_t i = 0; i < thread_num; ++i) { contexts[i].executor = this; if (_user_thread_contexts != NULL) { contexts[i].user_thread_context = _user_thread_contexts[i]; } int rc = THREAD_CREATE( &contexts[i].tid, NULL, &TaskExecutor::thread_entry, &contexts[i]); if (rc != 0) { LOG(ERROR) << "failed to create BSF worker thread: index=" << i << ", rc=" << rc << ", errno=" << errno << ":" << strerror(errno); return -1; } _thread_contexts.push_back(&contexts[i]); } int init_timeout = init_timeout_sec * 1000 * 1000; bool has_error = false; bool has_timeout = true; if (init_timeout == 0) { has_timeout = false; } while (!has_timeout || init_timeout > 0) { bool done = true; for (size_t i = 0; i < _thread_contexts.size(); ++i) { if (_thread_contexts[i]->init_status < 0) { has_error = true; break; } if (_thread_contexts[i]->init_status == 0) { done = false; } } if (has_error) { LOG(ERROR) << "BSF thread init error"; return -1; } if (done) { LOG(INFO) << "BSF thread init done"; return 0; } // 100ms const int sleep_interval = 100 * 1000; usleep(sleep_interval); init_timeout -= sleep_interval; } LOG(ERROR) << "BSF thread init timed out"; return -1; } template void TaskExecutor::stop() { _stop = true; for (size_t i = 0; i < _thread_contexts.size(); ++i) { THREAD_CANCEL(_thread_contexts[i]->tid); } for (size_t i = 0; i < _thread_contexts.size(); ++i) { THREAD_JOIN(_thread_contexts[i]->tid, NULL); } _thread_contexts.clear(); } template TaskHandler TaskExecutor::schedule( const void* inVectorT_ptr, void* outVectorT_ptr) { // NOLINT TaskT* task = butil::get_object(); if (!task) { LOG(ERROR) << "Failed get TaskT from object pool"; return TaskHandler::valid_handle(); } /* if (!BatchTasks::check_valid(in, out, _batch_align)) { LOG(ERROR) << "Invalid input & output"; return TaskHandler::valid_handle(); } */ int fds[2]; int rc = pipe(fds); if (rc != 0) { LOG(ERROR) << "call pipe() failed, errno=" << errno << ":" << strerror(errno); return TaskHandler::valid_handle(); } task->read_fd = fds[0]; task->write_fd = fds[1]; task->owner_tid = ::syscall(SYS_gettid); task->inVectorT_ptr = (const InVectorT*)inVectorT_ptr; task->outVectorT_ptr = (OutVectorT*)outVectorT_ptr; task->rem = task->batch_size(); task->index.store(0, butil::memory_order_relaxed); AutoMutex lock(_mut); _task_queue.push_back(task); THREAD_COND_SIGNAL(&_cond); return TaskHandler(*task); } // this function is accessed by multi thread. // so AutoMutex at first. // so batch.append_task is thread safe. // you dont need to add extra lock in append_task() template bool TaskExecutor::move_task_to_batch( BatchTasks& batch) { // NOLINT AutoMutex lock(_mut); while (_task_queue.empty()) { THREAD_COND_WAIT(&_cond, &_mut); } if (_task_queue.empty()) { LOG(ERROR) << "invalid task queue!"; return false; } while (!_task_queue.empty()) { TaskT* task = _task_queue.front(); size_t rem = batch.append_task(task); if (task->rem <= 0) { _task_queue.pop_front(); } if (rem <= 0) break; } return true; } // this function is accessed by multi thread. // move_task_to_batch have add lock inside the function. // Packaging 1 TaskT as 1 or Several TaskMeta. // TaskT is from the SingleTon TaskExecutor`s _task_queue // although TaskMeta is a local variable, but several TaskMeta may points to // the same TaskT which is get from the SingleTon TaskExecutor`s _task_queue. // put TaskMeta to the local variable BatchTasks batch. // batch.merge_tasks() and batch.notify_tasks() has no lock. // BatchTasks batch itself is a local variable, it`s thread safe. // If batch.merge_tasks() and batch.notify_tasks() do something to TaskMeta // you need to pay attention to that. // Multi-Thread deal with different TaskMeta(cause it`s created as local // variable) // But different TaskMeta may points to the same TaskT // which is get from the SingleTon TaskExecutor`s _task_queue. template int TaskExecutor::work(ThreadContext* context) { if (MempoolWrapper::instance().thread_initialize() != 0) { LOG(ERROR) << "Failed thread initialize mempool"; return -1; } if (_thread_init_fn != NULL) { if (_thread_init_fn(context->user_thread_context) != 0) { LOG(ERROR) << "execute thread init thunk failed, BSF thread will exit"; context->init_status = -1; return -1; } else { LOG(INFO) << "execute thread init thunk succeed"; } } context->init_status = 1; while (!_stop) { if (_thread_reset_fn != NULL) { if (_thread_reset_fn(context->user_thread_context) != 0) { LOG(ERROR) << "execute user thread reset failed"; } } if (MempoolWrapper::instance().thread_clear() != 0) { LOG(ERROR) << "Failed thread clear mempool"; return -1; } BatchTasks batch(_batch_size, _batch_align); if (move_task_to_batch(batch)) { batch.merge_tasks(); _fn(&batch.in(), &batch.out()); batch.notify_tasks(); } } return 0; } template bool TaskManager::schedule(const void* in, void* out) { // NOLINT TaskHandler handler = TaskExecutorVector::instance()[_model_index].schedule(in, out); if (handler.valid()) { _task_owned = handler; return true; } else { LOG(ERROR) << "failed to schedule task"; return false; } } template void TaskManager::wait() { char buffer[128]; while (read(_task_owned.read_fd, buffer, sizeof(buffer)) < 0 && errno == EINTR) { } close(_task_owned.read_fd); close(_task_owned.write_fd); _task_owned.read_fd = -1; _task_owned.write_fd = -1; return; } } // namespace bsf } // namespace im