提交 2f75e797 编写于 作者: X xiexionghang

use bthread for multi-thread pipeline

上级 057a3ef9
#include "paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
void* execute_bthread_task(void* args) {
auto* param = reinterpret_cast<::std::tuple<std::packaged_task<void()>*, google::protobuf::Closure*>*>(args);
auto* task = ::std::get<0>(*param);
auto* closure = ::std::get<1>(*param);
(*task)();
if (closure != NULL) {
closure->Run();
}
delete task;
delete param;
return NULL;
}
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#pragma once
#ifndef BUTIL_LOGGING_H_
#define BUTIL_LOGGING_H_
#endif
#include <tuple>
#include <future>
#include <functional>
#include <forward_list>
#include "glog/logging.h"
#include "google/protobuf/stubs/callback.h"
#include "bthread/bthread.h"
#include "bthread/mutex.h"
namespace paddle {
namespace custom_trainer {
namespace feed {
void* execute_bthread_task(void* args);
class BthreadTaskRunner {
public:
static BthreadTaskRunner& instance() {
static BthreadTaskRunner runner;
return runner;
}
template <class Callable, class... Args>
int add_task(Callable &&func, Args &&... args) {
bthread_t th;
auto* task = new std::packaged_task<void()>(
std::bind(std::forward<Callable>(func), std::forward<Args>(args)...));
auto* param = new ::std::tuple<std::packaged_task<void()>*, google::protobuf::Closure*>(
::std::move(task), NULL);
if (0 != bthread_start_background(&th, NULL, execute_bthread_task, param)) {
delete task;
delete param;
return -1;
}
return 0;
}
private:
BthreadTaskRunner() {}
~BthreadTaskRunner() {}
};
} // namespace feed
} // namespace custom_trainer
} // namespace paddle
#pragma once
#include <thread>
#include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/train/custom_trainer/feed/common/bthread_task_runner.h"
namespace paddle {
namespace custom_trainer {
......@@ -18,7 +19,6 @@ class PipelineOptions {
public:
PipelineOptions() = default;
uint32_t batch_size = 10; // pipe输出的batch大小
uint32_t thread_num = 1; // converter的并发线程数
float input_output_rate = 1; // 输入/输出 qps流量比
uint32_t buffer_batch_count = 4; // pipe预存count组batch数据
bool need_hold_input_data = false; // 是否保存input流数据,否则消费后释放
......@@ -56,9 +56,7 @@ public:
_output_channel->SetBlockSize(options.batch_size);
size_t input_batch_size = options.batch_size * options.input_output_rate;
_input_channel->SetBlockSize(input_batch_size);
_input_data_buffer.resize(input_batch_size * options.buffer_batch_count);
_output_data_buffer.resize(options.batch_size * options.buffer_batch_count);
_output_channel->SetCapacity(_output_data_buffer.size());
_output_channel->SetCapacity(options.batch_size * options.buffer_batch_count);
if (_options.need_hold_input_data) {
_input_channel_backup = ::paddle::framework::MakeChannel<TypeIn>();
_input_channel_backup->SetBlockSize(input_batch_size);
......@@ -75,7 +73,7 @@ public:
PipelineOptions& options, PipeDataConverter data_converter) {
// 保证全局batch一致
options.batch_size = pre_pipeline.options().batch_size / options.input_output_rate;
return initialize(pre_pipeline.options(), pre_pipeline.output_chnnel(), data_converter);
return initialize(options, pre_pipeline.output_chnnel(), data_converter);
}
virtual ~Pipeline() {
......@@ -106,23 +104,39 @@ public:
private:
void async_convert_data() {
size_t input_batch_size = _options.batch_size * _options.input_output_rate;
size_t input_data_max = input_batch_size * _options.buffer_batch_count;
std::atomic<int> parsing_num(0);
while (!_is_read_end) {
while (_output_channel->Size() < _input_data_buffer.size()) {
size_t read_size = _input_channel->
Read(input_batch_size, &_input_data_buffer[0]);
while (!_is_read_end && parsing_num < input_data_max) {
auto input_data_buffer = std::make_shared<std::vector<TypeIn>>(input_batch_size);
size_t read_size = _input_channel->Read(input_batch_size, input_data_buffer->data());
if (read_size == 0) {
_is_read_end = true;
break;
}
size_t write_size = 0;
CHECK(_converter(&_input_data_buffer[0], read_size,
&_output_data_buffer[0], &write_size, 0) == 0) << "Data Converter Do Failed";
_output_channel->WriteMove(write_size, &_output_data_buffer[0]);
if (_input_channel_backup) {
_input_channel_backup->WriteMove(read_size, &_input_data_buffer[0]);
}
parsing_num += read_size;
BthreadTaskRunner::instance().add_task(
[this, &parsing_num, read_size, input_data_buffer](){
size_t write_size = 0;
std::vector<TypeOut> output_data_buffer(_options.batch_size);
_converter(input_data_buffer->data(), read_size,
&output_data_buffer[0], &write_size, 0);
_output_channel->WriteMove(write_size, &output_data_buffer[0]);
if (_input_channel_backup) {
_input_channel_backup->WriteMove(read_size, input_data_buffer->data());
}
parsing_num -= read_size;
});
}
sleep(1);
// 离线场景,batch消费间的buffer充足,允许的gap时间较大
// 使用sleep会相对condition或yeild更省资源,且不影响吞吐
// 可以考虑添加参数选项,以适应其它场景,这里只针对离线场景实现。
while (!_is_read_end && parsing_num >= input_data_max) {
usleep(50000); // 50ms
}
}
while (parsing_num > 0) {
usleep(100000); // 100ms
}
_output_channel->Close();
if (_input_channel_backup) {
......@@ -136,8 +150,6 @@ private:
bool _is_read_end = false; //标识输入流读取完成
PipelineOptions _options; //pipe参数
PipeDataConverter _converter; //converter
std::vector<TypeIn> _input_data_buffer; //输入数据buffer
std::vector<TypeOut> _output_data_buffer; //出数据buffer
std::shared_ptr<std::thread> _convert_thread; //异步convert
::paddle::framework::Channel<TypeIn> _input_channel; //输入流
::paddle::framework::Channel<TypeIn> _input_channel_backup; //备份原始输入流
......
#pragma once
// Hide BLOG
#ifndef BUTIL_LOGGING_H_
#define BUTIL_LOGGING_H_
#endif
#ifndef COMPACT_GOOGLE_LOG_NOTICE
#define COMPACT_GOOGLE_LOG_NOTICE COMPACT_GOOGLE_LOG_INFO
#endif
#include "communicate/ps_server.h"
#include "communicate/ps_client.h"
......
......@@ -12,9 +12,12 @@ int MultiThreadExecutor::initialize(YAML::Node exe_config,
_trainer_context = context_ptr.get();
_train_data_name = exe_config["train_data_name"].as<std::string>();
_train_batch_size = exe_config["train_batch_size"].as<int>();
// 暂未使用,后续各流考虑独立线程池,或设置流数据的优先级
_input_parse_thread_num = exe_config["input_parse_thread_num"].as<int>();
_push_gradient_thread_num = exe_config["push_gradient_thread_num"].as<int>();
_train_thread_num = exe_config["train_thread_num"].as<int>();
_need_dump_all_model = exe_config["need_dump_all_model"].as<bool>();
CHECK(_train_thread_num > 0 && _train_batch_size > 0);
_thread_executors.resize(_train_thread_num);
......@@ -84,7 +87,6 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
PipelineOptions input_pipe_option;
input_pipe_option.need_hold_input_data = true;
input_pipe_option.batch_size = 1;
input_pipe_option.thread_num = _input_parse_thread_num;
input_pipe_option.input_output_rate = _train_batch_size;
input_pipe_option.buffer_batch_count = _train_thread_num;
auto input_pipe = std::make_shared<Pipeline<DataItem, ScopePoolObj>>();
......@@ -113,8 +115,7 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
// 训练流
PipelineOptions train_pipe_option;
train_pipe_option.input_output_rate = 1;
train_pipe_option.thread_num = _train_thread_num;
train_pipe_option.buffer_batch_count = 2 * _train_thread_num;
train_pipe_option.buffer_batch_count = _train_thread_num;
auto train_pipe = std::make_shared<Pipeline<ScopePoolObj, ScopePoolObj>>();
train_pipe->connect_to(*input_pipe, train_pipe_option,
[this] (ScopePoolObj* in_items, size_t in_num,
......@@ -126,13 +127,12 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
out_items[out_idx] = std::move(in_items[out_idx]);
}
return 0;
});
});
// 梯度回传流
PipelineOptions gradient_pipe_option;
gradient_pipe_option.input_output_rate = 1;
gradient_pipe_option.thread_num = _push_gradient_thread_num;
gradient_pipe_option.buffer_batch_count = 2 * _train_thread_num;
gradient_pipe_option.buffer_batch_count = _train_thread_num;
auto gradient_pipe = std::make_shared<Pipeline<ScopePoolObj, int>>();
gradient_pipe->connect_to(*train_pipe, gradient_pipe_option,
[epoch_id, this] (ScopePoolObj* in_items, size_t in_num,
......@@ -155,7 +155,7 @@ paddle::framework::Channel<DataItem> MultiThreadExecutor::run(
delete[] samples; // 所有pipe完成后,再回收sample
}
return 0;
});
});
// 等待训练流结束
std::vector<int> gradient_status;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册