From 2f75e797e407c296a22c810187cc1fff9f2b3196 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Wed, 28 Aug 2019 12:10:26 +0800 Subject: [PATCH] use bthread for multi-thread pipeline --- .../feed/common/bthread_task_runner.cc | 22 ++++++++ .../feed/common/bthread_task_runner.h | 50 +++++++++++++++++++ .../custom_trainer/feed/common/pipeline.h | 48 +++++++++++------- .../feed/common/pslib_warpper.h | 5 ++ .../feed/executor/multi_thread_executor.cc | 14 +++--- 5 files changed, 114 insertions(+), 25 deletions(-) create mode 100644 paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.cc create mode 100644 paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h diff --git a/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.cc b/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.cc new file mode 100644 index 00000000..857b8040 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.cc @@ -0,0 +1,22 @@ +#include "paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +void* execute_bthread_task(void* args) { + auto* param = reinterpret_cast<::std::tuple*, google::protobuf::Closure*>*>(args); + auto* task = ::std::get<0>(*param); + auto* closure = ::std::get<1>(*param); + (*task)(); + if (closure != NULL) { + closure->Run(); + } + delete task; + delete param; + return NULL; +} + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h b/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h new file mode 100644 index 00000000..f0b646d9 --- /dev/null +++ b/paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h @@ -0,0 +1,50 @@ +#pragma once + +#ifndef BUTIL_LOGGING_H_ +#define BUTIL_LOGGING_H_ +#endif + +#include +#include +#include +#include +#include "glog/logging.h" +#include "google/protobuf/stubs/callback.h" +#include "bthread/bthread.h" +#include "bthread/mutex.h" + +namespace paddle { +namespace custom_trainer { +namespace feed { + +void* execute_bthread_task(void* args); + +class BthreadTaskRunner { +public: + static BthreadTaskRunner& instance() { + static BthreadTaskRunner runner; + return runner; + } + + template + int add_task(Callable &&func, Args &&... args) { + bthread_t th; + auto* task = new std::packaged_task( + std::bind(std::forward(func), std::forward(args)...)); + auto* param = new ::std::tuple*, google::protobuf::Closure*>( + ::std::move(task), NULL); + if (0 != bthread_start_background(&th, NULL, execute_bthread_task, param)) { + delete task; + delete param; + return -1; + } + return 0; + } +private: + BthreadTaskRunner() {} + ~BthreadTaskRunner() {} +}; + +} // namespace feed +} // namespace custom_trainer +} // namespace paddle diff --git a/paddle/fluid/train/custom_trainer/feed/common/pipeline.h b/paddle/fluid/train/custom_trainer/feed/common/pipeline.h index 52ff8e46..588b1bc2 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/pipeline.h +++ b/paddle/fluid/train/custom_trainer/feed/common/pipeline.h @@ -1,6 +1,7 @@ #pragma once #include #include "paddle/fluid/framework/archive.h" +#include "paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h" namespace paddle { namespace custom_trainer { @@ -18,7 +19,6 @@ class PipelineOptions { public: PipelineOptions() = default; uint32_t batch_size = 10; // pipe输出的batch大小 - uint32_t thread_num = 1; // converter的并发线程数 float input_output_rate = 1; // 输入/输出 qps流量比 uint32_t buffer_batch_count = 4; // pipe预存count组batch数据 bool need_hold_input_data = false; // 是否保存input流数据,否则消费后释放 @@ -56,9 +56,7 @@ public: _output_channel->SetBlockSize(options.batch_size); size_t input_batch_size = options.batch_size * options.input_output_rate; _input_channel->SetBlockSize(input_batch_size); - _input_data_buffer.resize(input_batch_size * options.buffer_batch_count); - _output_data_buffer.resize(options.batch_size * options.buffer_batch_count); - _output_channel->SetCapacity(_output_data_buffer.size()); + _output_channel->SetCapacity(options.batch_size * options.buffer_batch_count); if (_options.need_hold_input_data) { _input_channel_backup = ::paddle::framework::MakeChannel(); _input_channel_backup->SetBlockSize(input_batch_size); @@ -75,7 +73,7 @@ public: PipelineOptions& options, PipeDataConverter data_converter) { // 保证全局batch一致 options.batch_size = pre_pipeline.options().batch_size / options.input_output_rate; - return initialize(pre_pipeline.options(), pre_pipeline.output_chnnel(), data_converter); + return initialize(options, pre_pipeline.output_chnnel(), data_converter); } virtual ~Pipeline() { @@ -106,23 +104,39 @@ public: private: void async_convert_data() { size_t input_batch_size = _options.batch_size * _options.input_output_rate; + size_t input_data_max = input_batch_size * _options.buffer_batch_count; + std::atomic parsing_num(0); while (!_is_read_end) { - while (_output_channel->Size() < _input_data_buffer.size()) { - size_t read_size = _input_channel-> - Read(input_batch_size, &_input_data_buffer[0]); + while (!_is_read_end && parsing_num < input_data_max) { + auto input_data_buffer = std::make_shared>(input_batch_size); + size_t read_size = _input_channel->Read(input_batch_size, input_data_buffer->data()); if (read_size == 0) { _is_read_end = true; break; } - size_t write_size = 0; - CHECK(_converter(&_input_data_buffer[0], read_size, - &_output_data_buffer[0], &write_size, 0) == 0) << "Data Converter Do Failed"; - _output_channel->WriteMove(write_size, &_output_data_buffer[0]); - if (_input_channel_backup) { - _input_channel_backup->WriteMove(read_size, &_input_data_buffer[0]); - } + parsing_num += read_size; + BthreadTaskRunner::instance().add_task( + [this, &parsing_num, read_size, input_data_buffer](){ + size_t write_size = 0; + std::vector output_data_buffer(_options.batch_size); + _converter(input_data_buffer->data(), read_size, + &output_data_buffer[0], &write_size, 0); + _output_channel->WriteMove(write_size, &output_data_buffer[0]); + if (_input_channel_backup) { + _input_channel_backup->WriteMove(read_size, input_data_buffer->data()); + } + parsing_num -= read_size; + }); } - sleep(1); + // 离线场景,batch消费间的buffer充足,允许的gap时间较大 + // 使用sleep会相对condition或yeild更省资源,且不影响吞吐 + // 可以考虑添加参数选项,以适应其它场景,这里只针对离线场景实现。 + while (!_is_read_end && parsing_num >= input_data_max) { + usleep(50000); // 50ms + } + } + while (parsing_num > 0) { + usleep(100000); // 100ms } _output_channel->Close(); if (_input_channel_backup) { @@ -136,8 +150,6 @@ private: bool _is_read_end = false; //标识输入流读取完成 PipelineOptions _options; //pipe参数 PipeDataConverter _converter; //converter - std::vector _input_data_buffer; //输入数据buffer - std::vector _output_data_buffer; //出数据buffer std::shared_ptr _convert_thread; //异步convert ::paddle::framework::Channel _input_channel; //输入流 ::paddle::framework::Channel _input_channel_backup; //备份原始输入流 diff --git a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h index 10406d2b..7e68337c 100644 --- a/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h +++ b/paddle/fluid/train/custom_trainer/feed/common/pslib_warpper.h @@ -1,8 +1,13 @@ #pragma once // Hide BLOG +#ifndef BUTIL_LOGGING_H_ #define BUTIL_LOGGING_H_ +#endif +#ifndef COMPACT_GOOGLE_LOG_NOTICE #define COMPACT_GOOGLE_LOG_NOTICE COMPACT_GOOGLE_LOG_INFO +#endif + #include "communicate/ps_server.h" #include "communicate/ps_client.h" diff --git a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc index 309e453a..28d0eb41 100644 --- a/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc +++ b/paddle/fluid/train/custom_trainer/feed/executor/multi_thread_executor.cc @@ -12,9 +12,12 @@ int MultiThreadExecutor::initialize(YAML::Node exe_config, _trainer_context = context_ptr.get(); _train_data_name = exe_config["train_data_name"].as(); _train_batch_size = exe_config["train_batch_size"].as(); + + // 暂未使用,后续各流考虑独立线程池,或设置流数据的优先级 _input_parse_thread_num = exe_config["input_parse_thread_num"].as(); _push_gradient_thread_num = exe_config["push_gradient_thread_num"].as(); _train_thread_num = exe_config["train_thread_num"].as(); + _need_dump_all_model = exe_config["need_dump_all_model"].as(); CHECK(_train_thread_num > 0 && _train_batch_size > 0); _thread_executors.resize(_train_thread_num); @@ -84,7 +87,6 @@ paddle::framework::Channel MultiThreadExecutor::run( PipelineOptions input_pipe_option; input_pipe_option.need_hold_input_data = true; input_pipe_option.batch_size = 1; - input_pipe_option.thread_num = _input_parse_thread_num; input_pipe_option.input_output_rate = _train_batch_size; input_pipe_option.buffer_batch_count = _train_thread_num; auto input_pipe = std::make_shared>(); @@ -113,8 +115,7 @@ paddle::framework::Channel MultiThreadExecutor::run( // 训练流 PipelineOptions train_pipe_option; train_pipe_option.input_output_rate = 1; - train_pipe_option.thread_num = _train_thread_num; - train_pipe_option.buffer_batch_count = 2 * _train_thread_num; + train_pipe_option.buffer_batch_count = _train_thread_num; auto train_pipe = std::make_shared>(); train_pipe->connect_to(*input_pipe, train_pipe_option, [this] (ScopePoolObj* in_items, size_t in_num, @@ -126,13 +127,12 @@ paddle::framework::Channel MultiThreadExecutor::run( out_items[out_idx] = std::move(in_items[out_idx]); } return 0; - }); + }); // 梯度回传流 PipelineOptions gradient_pipe_option; gradient_pipe_option.input_output_rate = 1; - gradient_pipe_option.thread_num = _push_gradient_thread_num; - gradient_pipe_option.buffer_batch_count = 2 * _train_thread_num; + gradient_pipe_option.buffer_batch_count = _train_thread_num; auto gradient_pipe = std::make_shared>(); gradient_pipe->connect_to(*train_pipe, gradient_pipe_option, [epoch_id, this] (ScopePoolObj* in_items, size_t in_num, @@ -155,7 +155,7 @@ paddle::framework::Channel MultiThreadExecutor::run( delete[] samples; // 所有pipe完成后,再回收sample } return 0; - }); + }); // 等待训练流结束 std::vector gradient_status; -- GitLab