提交 f529675c 编写于 作者: S seiriosPlus

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into optimize/large_scale_kv_spped

...@@ -107,6 +107,9 @@ function(select_nvcc_arch_flags out_variable) ...@@ -107,6 +107,9 @@ function(select_nvcc_arch_flags out_variable)
elseif(${CUDA_ARCH_NAME} STREQUAL "Maxwell") elseif(${CUDA_ARCH_NAME} STREQUAL "Maxwell")
set(cuda_arch_bin "50") set(cuda_arch_bin "50")
elseif(${CUDA_ARCH_NAME} STREQUAL "Pascal") elseif(${CUDA_ARCH_NAME} STREQUAL "Pascal")
if (NOT ${CMAKE_CUDA_COMPILER_VERSION} LESS 10.0)
add_definitions("-DSUPPORTS_CUDA_FP16")
endif()
set(cuda_arch_bin "60 61") set(cuda_arch_bin "60 61")
elseif(${CUDA_ARCH_NAME} STREQUAL "Volta") elseif(${CUDA_ARCH_NAME} STREQUAL "Volta")
if (NOT ${CMAKE_CUDA_COMPILER_VERSION} LESS 10.0) if (NOT ${CMAKE_CUDA_COMPILER_VERSION} LESS 10.0)
......
...@@ -527,6 +527,8 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { ...@@ -527,6 +527,8 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) {
VLOG(0) << "error: the number of ids is a negative number: " << num; VLOG(0) << "error: the number of ids is a negative number: " << num;
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: " << all_slots_.size();
return false; return false;
} else if (num == 0) { } else if (num == 0) {
VLOG(0) VLOG(0)
...@@ -536,42 +538,66 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { ...@@ -536,42 +538,66 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) {
"characters."; "characters.";
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: " << all_slots_.size();
return false; return false;
} else if (errno == ERANGE || num > INT_MAX) { } else if (errno == ERANGE || num > INT_MAX) {
VLOG(0) << "error: the number of ids greater than INT_MAX"; VLOG(0) << "error: the number of ids greater than INT_MAX";
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: " << all_slots_.size();
return false; return false;
} }
if (all_slots_type_[i] == "float") { if (all_slots_type_[i] == "float") {
for (int i = 0; i < num; ++i) { for (int j = 0; j < num; ++j) {
strtof(endptr, &endptr); strtof(endptr, &endptr);
if (errno == ERANGE) { if (errno == ERANGE) {
VLOG(0) << "error: the value is out of the range of " VLOG(0) << "error: the value is out of the range of "
"representable values for float"; "representable values for float";
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: "
<< all_slots_.size();
VLOG(0) << "and in this slot: " << j
<< " th id with total id number: " << num;
return false; return false;
} }
if (i + 1 != num && endptr - str == len) { if (j + 1 != num && endptr - str == len) {
VLOG(0) << "error: there is a wrong with the number of ids."; VLOG(0) << "error: there is a wrong with the number of ids.";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: "
<< all_slots_.size();
VLOG(0) << "and in this slot: " << j
<< " th id with total id number: " << num;
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
return false; return false;
} }
} }
} else if (all_slots_type_[i] == "uint64") { } else if (all_slots_type_[i] == "uint64") {
for (int i = 0; i < num; ++i) { for (int j = 0; j < num; ++j) {
strtoull(endptr, &endptr, 10); strtoull(endptr, &endptr, 10);
if (errno == ERANGE) { if (errno == ERANGE) {
VLOG(0) << "error: the value is out of the range of " VLOG(0) << "error: the value is out of the range of "
"representable values for uint64_t"; "representable values for uint64_t";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: "
<< all_slots_.size();
VLOG(0) << "and in this slot: " << j
<< " th id with total id number: " << num;
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
return false; return false;
} }
if (i + 1 != num && endptr - str == len) { if (j + 1 != num && endptr - str == len) {
VLOG(0) << "error: there is a wrong with the number of ids."; VLOG(0) << "error: there is a wrong with the number of ids.";
VLOG(0) << "Error occured when parsing " << i
<< " th slot with total slots number: "
<< all_slots_.size();
VLOG(0) << "and in this slot: " << j
<< " th id with total id number: " << num;
VLOG(0) << "please check line<" << instance_cout << "> in file<" VLOG(0) << "please check line<" << instance_cout << "> in file<"
<< filename << ">"; << filename << ">";
return false; return false;
...@@ -632,8 +658,13 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( ...@@ -632,8 +658,13 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
"The number of ids can not be zero, you need padding " "The number of ids can not be zero, you need padding "
"it in data generator; or if there is something wrong with " "it in data generator; or if there is something wrong with "
"the data, please check if the data contains unresolvable " "the data, please check if the data contains unresolvable "
"characters.\nplease check this error line: %s", "characters.\nplease check this error line: %s, \n Specifically, "
str)); "something wrong happened(the length of this slot's feasign is 0)"
"when we parse the %d th slots."
"Maybe something wrong around this slot",
"\nWe detect the feasign number of this slot is %d, "
"which is illegal.",
str, i, num));
if (idx != -1) { if (idx != -1) {
(*instance)[idx].Init(all_slots_type_[i]); (*instance)[idx].Init(all_slots_type_[i]);
if ((*instance)[idx].GetType()[0] == 'f') { // float if ((*instance)[idx].GetType()[0] == 'f') { // float
...@@ -683,8 +714,13 @@ bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) { ...@@ -683,8 +714,13 @@ bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) {
"The number of ids can not be zero, you need padding " "The number of ids can not be zero, you need padding "
"it in data generator; or if there is something wrong with " "it in data generator; or if there is something wrong with "
"the data, please check if the data contains unresolvable " "the data, please check if the data contains unresolvable "
"characters.\nplease check this error line: %s.", "characters.\nplease check this error line: %s, \n Specifically, "
str)); "something wrong happened(the length of this slot's feasign is 0)"
"when we parse the %d th slots."
"Maybe something wrong around this slot",
"\nWe detect the feasign number of this slot is %d, "
"which is illegal.",
str, i, num));
if (idx != -1) { if (idx != -1) {
(*instance)[idx].Init(all_slots_type_[i]); (*instance)[idx].Init(all_slots_type_[i]);
...@@ -916,8 +952,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { ...@@ -916,8 +952,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) {
"The number of ids can not be zero, you need padding " "The number of ids can not be zero, you need padding "
"it in data generator; or if there is something wrong with " "it in data generator; or if there is something wrong with "
"the data, please check if the data contains unresolvable " "the data, please check if the data contains unresolvable "
"characters.\nplease check this error line: %s.", "characters.\nplease check this error line: %s, \n Specifically, "
str)); "something wrong happened(the length of this slot's feasign is 0)"
"when we parse the %d th slots."
"Maybe something wrong around this slot",
"\nWe detect the feasign number of this slot is %d, "
"which is illegal.",
str, i, num));
if (idx != -1) { if (idx != -1) {
if (all_slots_type_[i][0] == 'f') { // float if (all_slots_type_[i][0] == 'f') { // float
for (int j = 0; j < num; ++j) { for (int j = 0; j < num; ++j) {
...@@ -982,8 +1023,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) { ...@@ -982,8 +1023,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) {
"The number of ids can not be zero, you need padding " "The number of ids can not be zero, you need padding "
"it in data generator; or if there is something wrong with " "it in data generator; or if there is something wrong with "
"the data, please check if the data contains unresolvable " "the data, please check if the data contains unresolvable "
"characters.\nplease check this error line: %s.", "characters.\nplease check this error line: %s, \n Specifically, "
str)); "something wrong happened(the length of this slot's feasign is 0)"
"when we parse the %d th slots."
"Maybe something wrong around this slot",
"\nWe detect the feasign number of this slot is %d, "
"which is illegal.",
str, i, num));
if (idx != -1) { if (idx != -1) {
if (all_slots_type_[i][0] == 'f') { // float if (all_slots_type_[i][0] == 'f') { // float
......
...@@ -19,6 +19,8 @@ limitations under the License. */ ...@@ -19,6 +19,8 @@ limitations under the License. */
namespace gloo { namespace gloo {
namespace rendezvous { namespace rendezvous {
constexpr int kNodeSize = 136;
HdfsStore::HdfsStore(const std::string& path) { HdfsStore::HdfsStore(const std::string& path) {
path_ = path; path_ = path;
wait_sleep_ms_ = 10000; wait_sleep_ms_ = 10000;
...@@ -213,12 +215,14 @@ void ParallelConnectContext::connectFullMesh( ...@@ -213,12 +215,14 @@ void ParallelConnectContext::connectFullMesh(
storeKey << rank; storeKey << rank;
store.set(storeKey.str(), allBytes); store.set(storeKey.str(), allBytes);
auto total_add_size = kNodeSize * (size - 1);
std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_); std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
// Connect every pair // Connect every pair
for (uint32_t i = 0; i < connect_threads.size(); ++i) { for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i].reset(new std::thread( connect_threads[i].reset(new std::thread(
[&store, &transportContext, this](size_t thread_idx, [&store, &transportContext, total_add_size, this](
size_t thread_num) -> void { size_t thread_idx, size_t thread_num) -> void {
for (int i = thread_idx; i < size; i += thread_num) { for (int i = thread_idx; i < size; i += thread_num) {
if (i == rank) { if (i == rank) {
continue; continue;
...@@ -226,8 +230,23 @@ void ParallelConnectContext::connectFullMesh( ...@@ -226,8 +230,23 @@ void ParallelConnectContext::connectFullMesh(
// Wait for address of other side of this pair to become available // Wait for address of other side of this pair to become available
std::string key = std::to_string(i); std::string key = std::to_string(i);
store.wait({key}, getTimeout()); store.wait({key}, getTimeout());
std::vector<char> allAddrs;
auto max_retry_times = 5;
// Connect to other side of this pair // Connect to other side of this pair
auto allAddrs = store.get(key);
while (max_retry_times > 0) {
allAddrs = store.get(key);
VLOG(3) << "store get all address size: " << allAddrs.size()
<< " except: " << total_add_size;
if (allAddrs.size() == static_cast<size_t>(total_add_size)) {
break;
}
--max_retry_times;
}
auto addr = extractAddress(allAddrs, i); auto addr = extractAddress(allAddrs, i);
transportContext->getPair(i)->connect(addr); transportContext->getPair(i)->connect(addr);
} }
......
...@@ -80,10 +80,10 @@ class EmbEltwiseLayerNormOpConverter : public OpConverter { ...@@ -80,10 +80,10 @@ class EmbEltwiseLayerNormOpConverter : public OpConverter {
nvinfer1::ILayer* layer = nullptr; nvinfer1::ILayer* layer = nullptr;
if (engine_->with_dynamic_shape()) { if (engine_->with_dynamic_shape()) {
plugin::DynamicPluginTensorRT* plugin = nullptr; auto use_fp16 = engine_->WithFp16();
plugin = new plugin::EmbEltwiseLayernormPluginDynamic<float>( auto plugin = new plugin::EmbEltwiseLayernormPluginDynamic(
input_embs, bias, scale, emb_sizes, bias_size, scale_size, hidden, input_embs, bias, scale, emb_sizes, bias_size, scale_size, hidden,
eps); eps, use_fp16);
layer = engine_->AddPluginV2(input_ids.data(), input_num, plugin); layer = engine_->AddPluginV2(input_ids.data(), input_num, plugin);
} else { } else {
PADDLE_THROW(platform::errors::Fatal( PADDLE_THROW(platform::errors::Fatal(
......
...@@ -32,13 +32,34 @@ namespace plugin { ...@@ -32,13 +32,34 @@ namespace plugin {
#if IS_TRT_VERSION_GE(6000) #if IS_TRT_VERSION_GE(6000)
template <typename T> template <typename T>
int EmbEltwiseLayernormPluginDynamic<T>::initialize() { EmbEltwiseLayernormPluginDynamicImpl<
T>::~EmbEltwiseLayernormPluginDynamicImpl() {
this->terminate();
}
inline half fp32tofp16(float x) { return static_cast<half>(x); }
template <typename T>
int EmbEltwiseLayernormPluginDynamicImpl<T>::initialize() {
embs_gpu_.resize(embs_.size()); embs_gpu_.resize(embs_.size());
for (int i = 0; i < embs_.size(); i++) { for (int i = 0; i < embs_.size(); i++) {
if (embs_[i]) { if (embs_[i]) {
cudaMalloc(&embs_gpu_[i], sizeof(float) * emb_sizes_[i]); T *host_ptr;
cudaMemcpy(embs_gpu_[i], embs_[i], emb_sizes_[i] * sizeof(float), auto size = emb_sizes_[i];
if (std::is_same<T, half>::value) {
host_ptr = new T[size];
std::transform(embs_[i], (embs_[i] + size), host_ptr, fp32tofp16);
} else {
host_ptr = reinterpret_cast<T *>(embs_[i]);
}
cudaMalloc(&embs_gpu_[i], sizeof(T) * size);
cudaMemcpy(embs_gpu_[i], host_ptr, size * sizeof(T),
cudaMemcpyHostToDevice); cudaMemcpyHostToDevice);
if (std::is_same<T, half>::value) {
delete[] host_ptr;
}
} }
} }
...@@ -53,11 +74,105 @@ int EmbEltwiseLayernormPluginDynamic<T>::initialize() { ...@@ -53,11 +74,105 @@ int EmbEltwiseLayernormPluginDynamic<T>::initialize() {
cudaMemcpyHostToDevice); cudaMemcpyHostToDevice);
} }
int input_num = embs_.size();
in_ptr_tensor_.Resize({input_num});
emb_ptr_tensor_.Resize({input_num});
cudaGetDevice(&device_id_);
auto emb_ptr_gpu_d =
emb_ptr_tensor_.mutable_data<int64_t>(platform::CUDAPlace(device_id_));
cudaMemcpy(emb_ptr_gpu_d, embs_gpu_.data(), sizeof(uintptr_t) * input_num,
cudaMemcpyHostToDevice);
return 0; return 0;
} }
template <typename T> template <typename T>
nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic<T>::getOutputDimensions( void EmbEltwiseLayernormPluginDynamicImpl<T>::terminate() {
for (int i = 0; i < embs_gpu_.size(); ++i) {
if (embs_gpu_[i]) {
cudaFree(embs_gpu_[i]);
embs_gpu_[i] = nullptr;
}
}
if (bias_gpu_) {
cudaFree(bias_gpu_);
bias_gpu_ = nullptr;
}
if (scale_gpu_) {
cudaFree(scale_gpu_);
scale_gpu_ = nullptr;
}
}
template <typename T>
int EmbEltwiseLayernormPluginDynamicImpl<T>::enqueue(
const nvinfer1::PluginTensorDesc *input_desc,
const nvinfer1::PluginTensorDesc *output_desc, const void *const *inputs,
void *const *outputs, void *workspace, cudaStream_t stream) {
auto id_dims = input_desc[0].dims;
int batch = id_dims.d[0];
int seq_len = id_dims.d[1];
int input_num = embs_.size();
auto in_ptr_gpu_d =
in_ptr_tensor_.mutable_data<int64_t>(platform::CUDAPlace(device_id_));
auto emb_ptr_gpu_d =
emb_ptr_tensor_.mutable_data<int64_t>(platform::CUDAPlace(device_id_));
auto new_input_ptr = reinterpret_cast<uintptr_t>(inputs[0]);
if (old_input_ptr_ != new_input_ptr) {
old_input_ptr_ = new_input_ptr;
cudaMemcpyAsync(in_ptr_gpu_d, reinterpret_cast<const void *>(inputs),
sizeof(uintptr_t) * input_num, cudaMemcpyHostToDevice,
stream);
}
auto out_type = output_desc[0].type;
if (std::is_same<T, float>::value) {
PADDLE_ENFORCE_EQ(
out_type == nvinfer1::DataType::kFLOAT, true,
platform::errors::InvalidArgument(
"The EmbEltwiseLayernorm Plugin only support fp32 input."));
} else if (std::is_same<T, half>::value) {
PADDLE_ENFORCE_EQ(
out_type == nvinfer1::DataType::kHALF, true,
platform::errors::InvalidArgument(
"The EmbEltwiseLayernorm Plugin only support fp16 input."));
} else {
PADDLE_THROW(platform::errors::Fatal(
"Unsupport data type, the out type of EmbEltwiseLayernorm should be "
"float or half."));
}
auto *output_d = reinterpret_cast<T *>(outputs[0]);
operators::math::EmbEltwiseLayerNormFunctor<T> emb_eltwise_layernorm_func;
emb_eltwise_layernorm_func(batch, seq_len, hidden_size_, in_ptr_gpu_d,
scale_gpu_, bias_gpu_, emb_ptr_gpu_d, output_d,
eps_, input_num, stream);
return cudaGetLastError() != cudaSuccess;
}
template class EmbEltwiseLayernormPluginDynamicImpl<float>;
#ifdef SUPPORTS_CUDA_FP16
template class EmbEltwiseLayernormPluginDynamicImpl<half>;
#endif // SUPPORTS_CUDA_FP16
int EmbEltwiseLayernormPluginDynamic::initialize() {
impl_->initialize();
return 0;
}
void EmbEltwiseLayernormPluginDynamic::terminate() { impl_->terminate(); }
nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic::getOutputDimensions(
int output_index, const nvinfer1::DimsExprs *inputs, int nb_inputs, int output_index, const nvinfer1::DimsExprs *inputs, int nb_inputs,
nvinfer1::IExprBuilder &expr_builder) { // NOLINT nvinfer1::IExprBuilder &expr_builder) { // NOLINT
PADDLE_ENFORCE_EQ(output_index, 0, PADDLE_ENFORCE_EQ(output_index, 0,
...@@ -76,18 +191,7 @@ nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic<T>::getOutputDimensions( ...@@ -76,18 +191,7 @@ nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic<T>::getOutputDimensions(
return ret; return ret;
} }
template <typename T> bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination(
void EmbEltwiseLayernormPluginDynamic<T>::terminate() {
for (auto ptr : embs_gpu_) {
if (ptr) cudaFree(ptr);
}
if (bias_gpu_) cudaFree(bias_gpu_);
if (scale_gpu_) cudaFree(scale_gpu_);
}
template <typename T>
bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination(
int pos, const nvinfer1::PluginTensorDesc *in_out, int nb_inputs, int pos, const nvinfer1::PluginTensorDesc *in_out, int nb_inputs,
int nb_outputs) { int nb_outputs) {
PADDLE_ENFORCE_NOT_NULL( PADDLE_ENFORCE_NOT_NULL(
...@@ -98,6 +202,11 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination( ...@@ -98,6 +202,11 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination(
"The EmbEltwiseLayerNorm's output should be one" "The EmbEltwiseLayerNorm's output should be one"
"but it's (%d) outputs.", "but it's (%d) outputs.",
nb_outputs)); nb_outputs));
PADDLE_ENFORCE_EQ(nb_outputs, 1,
platform::errors::InvalidArgument(
"The EmbEltwiseLayerNorm's output should be one"
"but it's (%d) outputs.",
nb_outputs));
PADDLE_ENFORCE_LT( PADDLE_ENFORCE_LT(
pos, nb_inputs + nb_outputs, pos, nb_inputs + nb_outputs,
platform::errors::InvalidArgument("The pos(%d) should be less than the " platform::errors::InvalidArgument("The pos(%d) should be less than the "
...@@ -122,7 +231,7 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination( ...@@ -122,7 +231,7 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination(
} }
if (pos == all_nums - 1) { if (pos == all_nums - 1) {
if (sizeof(T) == sizeof(float)) { if (with_fp16_ == false) {
return desc.type == nvinfer1::DataType::kFLOAT; return desc.type == nvinfer1::DataType::kFLOAT;
} else { } else {
return desc.type == nvinfer1::DataType::kHALF; return desc.type == nvinfer1::DataType::kHALF;
...@@ -131,84 +240,27 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination( ...@@ -131,84 +240,27 @@ bool EmbEltwiseLayernormPluginDynamic<T>::supportsFormatCombination(
return false; return false;
} }
template <typename T> nvinfer1::DataType EmbEltwiseLayernormPluginDynamic::getOutputDataType(
nvinfer1::DataType EmbEltwiseLayernormPluginDynamic<T>::getOutputDataType(
int index, const nvinfer1::DataType *input_types, int nb_inputs) const { int index, const nvinfer1::DataType *input_types, int nb_inputs) const {
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
index, 0, platform::errors::InvalidArgument( index, 0, platform::errors::InvalidArgument(
"The EmbEltwiseLayernorm Plugin only has one input, so the " "The EmbEltwiseLayernorm Plugin only has one input, so the "
"index value should be 0, but get %d.", "index value should be 0, but get %d.",
index)); index));
return nvinfer1::DataType::kFLOAT; if (with_fp16_)
return nvinfer1::DataType::kHALF;
else
return nvinfer1::DataType::kFLOAT;
} }
template <typename T> int EmbEltwiseLayernormPluginDynamic::enqueue(
int EmbEltwiseLayernormPluginDynamic<T>::enqueue(
const nvinfer1::PluginTensorDesc *input_desc, const nvinfer1::PluginTensorDesc *input_desc,
const nvinfer1::PluginTensorDesc *output_desc, const void *const *inputs, const nvinfer1::PluginTensorDesc *output_desc, const void *const *inputs,
void *const *outputs, void *workspace, cudaStream_t stream) { void *const *outputs, void *workspace, cudaStream_t stream) {
auto id_dims = input_desc[0].dims; impl_->enqueue(input_desc, output_desc, inputs, outputs, workspace, stream);
int batch = id_dims.d[0];
int seq_len = id_dims.d[1];
int input_num = embs_.size();
framework::Tensor in_ptr_tensor, emb_ptr_tensor;
int device_id;
cudaGetDevice(&device_id);
in_ptr_tensor.Resize({input_num});
emb_ptr_tensor.Resize({input_num});
int64_t *in_ptr_gpu_d =
in_ptr_tensor.mutable_data<int64_t>(platform::CUDAPlace(device_id));
int64_t *emb_ptr_gpu_d =
emb_ptr_tensor.mutable_data<int64_t>(platform::CUDAPlace(device_id));
std::vector<uintptr_t> in_ptr, emb_ptr;
for (int i = 0; i < input_num; i++) {
in_ptr.push_back(reinterpret_cast<uintptr_t>(inputs[i]));
emb_ptr.push_back(reinterpret_cast<uintptr_t>(embs_gpu_[i]));
}
cudaMemcpyAsync(in_ptr_gpu_d, in_ptr.data(), sizeof(int64_t) * input_num,
cudaMemcpyHostToDevice, stream);
cudaMemcpyAsync(emb_ptr_gpu_d, emb_ptr.data(), sizeof(int64_t) * input_num,
cudaMemcpyHostToDevice, stream);
auto out_type = output_desc[0].type;
const unsigned tpb = 256;
const dim3 grid(seq_len, batch, 1);
const dim3 block(tpb, 1, 1);
if (sizeof(T) == sizeof(float)) {
PADDLE_ENFORCE_EQ(
out_type == nvinfer1::DataType::kFLOAT, true,
platform::errors::InvalidArgument(
"The EmbEltwiseLayernorm Plugin only support fp32 input."));
} else if (sizeof(T) == sizeof(int16_t)) {
PADDLE_ENFORCE_EQ(
out_type == nvinfer1::DataType::kHALF, true,
platform::errors::InvalidArgument(
"The EmbEltwiseLayernorm Plugin only support fp16 input."));
} else {
PADDLE_THROW(platform::errors::Fatal(
"Unsupport data type, the out type of EmbEltwiseLayernorm should be "
"float or half."));
}
T *output_d = static_cast<T *>(outputs[0]);
operators::math::EmbEltwiseLayerNormFunctor<T> emb_eltwise_layernorm_func;
emb_eltwise_layernorm_func(batch, seq_len, hidden_size_, in_ptr_gpu_d,
scale_gpu_, bias_gpu_, emb_ptr_gpu_d, output_d,
eps_, input_num, stream);
return cudaGetLastError() != cudaSuccess; return cudaGetLastError() != cudaSuccess;
} }
template class EmbEltwiseLayernormPluginDynamic<float>;
#ifdef SUPPORTS_CUDA_FP16
template class EmbEltwiseLayernormPluginDynamic<half>;
#endif // SUPPORTS_CUDA_FP16
#endif #endif
} // namespace plugin } // namespace plugin
......
...@@ -27,14 +27,76 @@ namespace tensorrt { ...@@ -27,14 +27,76 @@ namespace tensorrt {
namespace plugin { namespace plugin {
#if IS_TRT_VERSION_GE(6000) #if IS_TRT_VERSION_GE(6000)
class EmbEltwiseLayernormPluginDynamicImplBase {
public:
EmbEltwiseLayernormPluginDynamicImplBase() {}
virtual ~EmbEltwiseLayernormPluginDynamicImplBase() {}
virtual int initialize() = 0;
virtual void terminate() = 0;
virtual int enqueue(const nvinfer1::PluginTensorDesc* inputDesc,
const nvinfer1::PluginTensorDesc* outputDesc,
const void* const* inputs, void* const* outputs,
void* workspace, cudaStream_t stream) = 0;
};
template <typename T> template <typename T>
class EmbEltwiseLayernormPluginDynamicImpl
: public EmbEltwiseLayernormPluginDynamicImplBase {
public:
explicit EmbEltwiseLayernormPluginDynamicImpl(std::vector<float*> input_embs,
float* bias, float* scale,
std::vector<int> emb_sizes,
int bias_size, int scale_size,
int hidden_size, float eps)
: embs_(input_embs),
bias_(bias),
scale_(scale),
emb_sizes_(emb_sizes),
bias_size_(bias_size),
scale_size_(scale_size),
hidden_size_(hidden_size),
eps_(eps) {}
~EmbEltwiseLayernormPluginDynamicImpl();
int initialize();
void terminate();
int enqueue(const nvinfer1::PluginTensorDesc* inputDesc,
const nvinfer1::PluginTensorDesc* outputDesc,
const void* const* inputs, void* const* outputs, void* workspace,
cudaStream_t stream);
private:
std::vector<float*> embs_;
float* bias_{nullptr};
float* scale_{nullptr};
// data on devices
float* bias_gpu_{nullptr};
float* scale_gpu_{nullptr};
std::vector<T*> embs_gpu_;
std::vector<int> emb_sizes_;
int bias_size_;
int scale_size_;
int hidden_size_;
float eps_;
framework::Tensor in_ptr_tensor_, emb_ptr_tensor_;
int device_id_{0};
uintptr_t old_input_ptr_{0};
};
class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT {
public: public:
explicit EmbEltwiseLayernormPluginDynamic(std::vector<float*> input_embs, explicit EmbEltwiseLayernormPluginDynamic(std::vector<float*> input_embs,
float* bias, float* scale, float* bias, float* scale,
std::vector<int> emb_sizes, std::vector<int> emb_sizes,
int bias_size, int scale_size, int bias_size, int scale_size,
int hidden_size, float eps) int hidden_size, float eps,
bool with_fp16)
: embs_(input_embs), : embs_(input_embs),
bias_(bias), bias_(bias),
scale_(scale), scale_(scale),
...@@ -42,51 +104,81 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { ...@@ -42,51 +104,81 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT {
bias_size_(bias_size), bias_size_(bias_size),
scale_size_(scale_size), scale_size_(scale_size),
hidden_size_(hidden_size), hidden_size_(hidden_size),
eps_(eps) {} eps_(eps),
with_fp16_(with_fp16),
own_host_buff_(false) {
if (with_fp16) {
#ifdef SUPPORTS_CUDA_FP16
impl_ = new EmbEltwiseLayernormPluginDynamicImpl<half>(
embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_,
hidden_size_, eps_);
#else
PADDLE_THROW(platform::errors::Fatal(
"Unsupported data type, current GPU doesn't support half."));
#endif // SUPPORTS_CUDA_FP16
} else {
impl_ = new EmbEltwiseLayernormPluginDynamicImpl<float>(
embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_,
hidden_size_, eps_);
}
}
EmbEltwiseLayernormPluginDynamic(void const* serial_data, EmbEltwiseLayernormPluginDynamic(void const* serial_data,
size_t serial_length) { size_t serial_length)
: own_host_buff_(true) {
DeserializeValue(&serial_data, &serial_length, &emb_sizes_); DeserializeValue(&serial_data, &serial_length, &emb_sizes_);
embs_gpu_.resize(emb_sizes_.size());
embs_.resize(emb_sizes_.size()); embs_.resize(emb_sizes_.size());
for (size_t i = 0; i < emb_sizes_.size(); i++) { for (size_t i = 0; i < emb_sizes_.size(); i++) {
cudaMalloc(&embs_gpu_[i], sizeof(float) * emb_sizes_[i]); auto size = emb_sizes_[i];
cudaMemcpy(embs_gpu_[i], serial_data, emb_sizes_[i] * sizeof(float), auto ptr = new float[size];
cudaMemcpyHostToDevice); memcpy(ptr, serial_data, sizeof(float) * size);
embs_[i] = ptr;
reinterpret_cast<char const*&>(serial_data) += reinterpret_cast<char const*&>(serial_data) +=
emb_sizes_[i] * sizeof(float); emb_sizes_[i] * sizeof(float);
serial_length -= emb_sizes_[i] * sizeof(float); serial_length -= emb_sizes_[i] * sizeof(float);
embs_[i] = nullptr;
} }
DeserializeValue(&serial_data, &serial_length, &bias_size_); DeserializeValue(&serial_data, &serial_length, &bias_size_);
DeserializeValue(&serial_data, &serial_length, &scale_size_); DeserializeValue(&serial_data, &serial_length, &scale_size_);
cudaMalloc(&bias_gpu_, sizeof(float) * bias_size_); if (bias_size_) {
cudaMemcpy(bias_gpu_, serial_data, bias_size_ * sizeof(float), bias_ = new float[bias_size_];
cudaMemcpyHostToDevice); memcpy(bias_, serial_data, sizeof(float) * bias_size_);
bias_ = nullptr; }
reinterpret_cast<char const*&>(serial_data) += bias_size_ * sizeof(float); reinterpret_cast<char const*&>(serial_data) += bias_size_ * sizeof(float);
serial_length -= bias_size_ * sizeof(float); serial_length -= bias_size_ * sizeof(float);
cudaMalloc(&scale_gpu_, sizeof(float) * scale_size_); if (scale_size_) {
cudaMemcpy(scale_gpu_, serial_data, scale_size_ * sizeof(float), scale_ = new float[scale_size_];
cudaMemcpyHostToDevice); memcpy(scale_, serial_data, sizeof(float) * scale_size_);
scale_ = nullptr; }
reinterpret_cast<char const*&>(serial_data) += scale_size_ * sizeof(float); reinterpret_cast<char const*&>(serial_data) += scale_size_ * sizeof(float);
serial_length -= scale_size_ * sizeof(float); serial_length -= scale_size_ * sizeof(float);
DeserializeValue(&serial_data, &serial_length, &hidden_size_); DeserializeValue(&serial_data, &serial_length, &hidden_size_);
DeserializeValue(&serial_data, &serial_length, &eps_); DeserializeValue(&serial_data, &serial_length, &eps_);
DeserializeValue(&serial_data, &serial_length, &with_fp16_);
if (with_fp16_) {
#ifdef SUPPORTS_CUDA_FP16
impl_ = new EmbEltwiseLayernormPluginDynamicImpl<half>(
embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_,
hidden_size_, eps_);
#else
PADDLE_THROW(platform::errors::Fatal(
"Unsupported data type, current GPU doesn't support half."));
#endif // SUPPORTS_CUDA_FP16
} else {
impl_ = new EmbEltwiseLayernormPluginDynamicImpl<float>(
embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_,
hidden_size_, eps_);
}
} }
nvinfer1::IPluginV2DynamicExt* clone() const override { nvinfer1::IPluginV2DynamicExt* clone() const override {
auto ptr = new EmbEltwiseLayernormPluginDynamic( auto ptr = new EmbEltwiseLayernormPluginDynamic(
embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, hidden_size_, embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, hidden_size_,
eps_); eps_, with_fp16_);
ptr->embs_gpu_ = embs_gpu_;
ptr->bias_gpu_ = bias_gpu_;
ptr->scale_gpu_ = scale_gpu_;
return ptr; return ptr;
} }
...@@ -95,6 +187,7 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { ...@@ -95,6 +187,7 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT {
} }
int getNbOutputs() const override { return 1; } int getNbOutputs() const override { return 1; }
int initialize() override; int initialize() override;
void terminate() override;
size_t getSerializationSize() const override { size_t getSerializationSize() const override {
int sum_num = 0; int sum_num = 0;
...@@ -110,24 +203,32 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { ...@@ -110,24 +203,32 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT {
sum_num += (bias_size_ + scale_size_) * sizeof(float); sum_num += (bias_size_ + scale_size_) * sizeof(float);
sum_num += SerializedSize(hidden_size_); sum_num += SerializedSize(hidden_size_);
sum_num += SerializedSize(eps_); sum_num += SerializedSize(eps_);
// sum_num += SerializedSize(with_fp16_); sum_num += SerializedSize(with_fp16_);
return sum_num; return sum_num;
} }
void terminate() override;
void serialize(void* buffer) const override { void serialize(void* buffer) const override {
// SerializeValue(&buffer, with_fp16_);
SerializeValue(&buffer, emb_sizes_); SerializeValue(&buffer, emb_sizes_);
for (size_t i = 0; i < emb_sizes_.size(); i++) { for (size_t i = 0; i < emb_sizes_.size(); i++) {
SerializeCudaPointer(&buffer, embs_gpu_[i], emb_sizes_[i]); auto size = emb_sizes_[i];
for (int j = 0; j < size; ++j) {
SerializeValue(&buffer, embs_[i][j]);
}
} }
SerializeValue(&buffer, bias_size_); SerializeValue(&buffer, bias_size_);
SerializeValue(&buffer, scale_size_); SerializeValue(&buffer, scale_size_);
SerializeCudaPointer(&buffer, bias_gpu_, bias_size_); for (int i = 0; i < bias_size_; ++i) {
SerializeCudaPointer(&buffer, scale_gpu_, scale_size_); SerializeValue(&buffer, bias_[i]);
}
for (int i = 0; i < scale_size_; ++i) {
SerializeValue(&buffer, scale_[i]);
}
SerializeValue(&buffer, hidden_size_); SerializeValue(&buffer, hidden_size_);
SerializeValue(&buffer, eps_); SerializeValue(&buffer, eps_);
SerializeValue(&buffer, with_fp16_);
} }
nvinfer1::DimsExprs getOutputDimensions( nvinfer1::DimsExprs getOutputDimensions(
...@@ -158,23 +259,33 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { ...@@ -158,23 +259,33 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT {
const nvinfer1::DataType* input_types, const nvinfer1::DataType* input_types,
int nb_inputs) const override; int nb_inputs) const override;
void destroy() override { delete this; } void destroy() override {
if (own_host_buff_) {
for (auto ptr : embs_) {
delete[] ptr;
}
delete[] bias_;
delete[] scale_;
}
delete impl_;
delete this;
}
private: private:
std::vector<float*> embs_; std::vector<float*> embs_;
float* bias_; float* bias_;
float* scale_; float* scale_;
// data on devices
float* bias_gpu_;
float* scale_gpu_;
std::vector<float*> embs_gpu_;
std::vector<int> emb_sizes_; std::vector<int> emb_sizes_;
int bias_size_; int bias_size_;
int scale_size_; int scale_size_;
int hidden_size_; int hidden_size_;
float eps_; float eps_;
bool with_fp16_;
bool own_host_buff_{false};
EmbEltwiseLayernormPluginDynamicImplBase* impl_{nullptr};
}; };
class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator { class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator {
...@@ -198,8 +309,7 @@ class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator { ...@@ -198,8 +309,7 @@ class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator {
nvinfer1::IPluginV2* deserializePlugin(const char* name, nvinfer1::IPluginV2* deserializePlugin(const char* name,
const void* serial_data, const void* serial_data,
size_t serial_length) override { size_t serial_length) override {
return new EmbEltwiseLayernormPluginDynamic<float>(serial_data, return new EmbEltwiseLayernormPluginDynamic(serial_data, serial_length);
serial_length);
} }
void setPluginNamespace(const char* lib_namespace) override { void setPluginNamespace(const char* lib_namespace) override {
......
...@@ -151,7 +151,7 @@ void trt_ernie(bool with_fp16, std::vector<float> result) { ...@@ -151,7 +151,7 @@ void trt_ernie(bool with_fp16, std::vector<float> result) {
run(config, &out_data); // serialize run(config, &out_data); // serialize
run(*config_deser, &out_data); // deserialize run(*config_deser, &out_data); // deserialize
for (size_t i = 0; i < out_data.size(); i++) { for (size_t i = 0; i < out_data.size(); i++) {
EXPECT_NEAR(result[i], out_data[i], 1e-6); EXPECT_NEAR(result[i], out_data[i], 1e-2);
} }
} }
...@@ -159,13 +159,11 @@ TEST(AnalysisPredictor, no_fp16) { ...@@ -159,13 +159,11 @@ TEST(AnalysisPredictor, no_fp16) {
std::vector<float> result = {0.597841, 0.219972, 0.182187}; std::vector<float> result = {0.597841, 0.219972, 0.182187};
trt_ernie(false, result); trt_ernie(false, result);
} }
TEST(AnalysisPredictor, fp16) {
#ifdef SUPPORTS_CUDA_FP16 #ifdef SUPPORTS_CUDA_FP16
std::vector<float> result = {0.598336, 0.219558, 0.182106}; TEST(AnalysisPredictor, fp16) {
std::vector<float> result = {0.59923654, 0.21923761, 0.18152587};
trt_ernie(true, result); trt_ernie(true, result);
#endif
} }
#endif // SUPPORTS_CUDA_FP16
} // namespace inference } // namespace inference
} // namespace paddle } // namespace paddle
...@@ -121,6 +121,18 @@ function cmake_base() { ...@@ -121,6 +121,18 @@ function cmake_base() {
else else
exit 1 exit 1
fi fi
elif [ "$1" == "cp38-cp38" ]; then
if [ -d "/Library/Frameworks/Python.framework/Versions/3.8" ]; then
export LD_LIBRARY_PATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/
export DYLD_LIBRARY_PATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/
export PATH=/Library/Frameworks/Python.framework/Versions/3.8/bin/:${PATH}
PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/Library/Frameworks/Python.framework/Versions/3.8/bin/python3
-DPYTHON_INCLUDE_DIR:PATH=/Library/Frameworks/Python.framework/Versions/3.8/include/python3.8/
-DPYTHON_LIBRARY:FILEPATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/libpython3.8.dylib"
pip3.8 install --user -r ${PADDLE_ROOT}/python/requirements.txt
else
exit 1
fi
fi fi
# delete `gym` to avoid modifying requirements.txt in *.whl # delete `gym` to avoid modifying requirements.txt in *.whl
sed -i .bak "/^gym$/d" ${PADDLE_ROOT}/python/requirements.txt sed -i .bak "/^gym$/d" ${PADDLE_ROOT}/python/requirements.txt
...@@ -176,6 +188,13 @@ function cmake_base() { ...@@ -176,6 +188,13 @@ function cmake_base() {
-DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.7.0/include/python3.7m -DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.7.0/include/python3.7m
-DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.7.0/lib/libpython3.so" -DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.7.0/lib/libpython3.so"
pip3.7 install -r ${PADDLE_ROOT}/python/requirements.txt pip3.7 install -r ${PADDLE_ROOT}/python/requirements.txt
elif [ "$1" == "cp38-cp38" ]; then
export LD_LIBRARY_PATH=/opt/_internal/cpython-3.8.0/lib/:${LD_LIBRARY_PATH}
export PATH=/opt/_internal/cpython-3.8.0/bin/:${PATH}
export PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/opt/_internal/cpython-3.8.0/bin/python3.8
-DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.8.0/include/python3.8
-DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.8.0/lib/libpython3.so"
pip3.8 install -r ${PADDLE_ROOT}/python/requirements.txt
fi fi
else else
pip install -r ${PADDLE_ROOT}/python/requirements.txt pip install -r ${PADDLE_ROOT}/python/requirements.txt
...@@ -514,6 +533,8 @@ EOF ...@@ -514,6 +533,8 @@ EOF
pip3.6 uninstall -y paddlepaddle pip3.6 uninstall -y paddlepaddle
elif [ "$1" == "cp37-cp37m" ]; then elif [ "$1" == "cp37-cp37m" ]; then
pip3.7 uninstall -y paddlepaddle pip3.7 uninstall -y paddlepaddle
elif [ "$1" == "cp38-cp38" ]; then
pip3.8 uninstall -y paddlepaddle
fi fi
set -ex set -ex
...@@ -527,6 +548,8 @@ EOF ...@@ -527,6 +548,8 @@ EOF
pip3.6 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl pip3.6 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl
elif [ "$1" == "cp37-cp37m" ]; then elif [ "$1" == "cp37-cp37m" ]; then
pip3.7 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl pip3.7 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl
elif [ "$1" == "cp38-cp38" ]; then
pip3.8 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl
fi fi
tmpfile_rand=`date +%s%N` tmpfile_rand=`date +%s%N`
tmpfile=$tmp_dir/$tmpfile_rand tmpfile=$tmp_dir/$tmpfile_rand
...@@ -666,7 +689,7 @@ function generate_api_spec() { ...@@ -666,7 +689,7 @@ function generate_api_spec() {
awk -F '(' '{print $NF}' $spec_path >${spec_path}.doc awk -F '(' '{print $NF}' $spec_path >${spec_path}.doc
awk -F '(' '{$NF="";print $0}' $spec_path >${spec_path}.api awk -F '(' '{$NF="";print $0}' $spec_path >${spec_path}.api
if [ "$1" == "cp35-cp35m" ] || [ "$1" == "cp36-cp36m" ] || [ "$1" == "cp37-cp37m" ]; then if [ "$1" == "cp35-cp35m" ] || [ "$1" == "cp36-cp36m" ] || [ "$1" == "cp37-cp37m" ] || [ "$1" == "cp38-cp38" ]; then
# Use sed to make python2 and python3 sepc keeps the same # Use sed to make python2 and python3 sepc keeps the same
sed -i 's/arg0: str/arg0: unicode/g' $spec_path sed -i 's/arg0: str/arg0: unicode/g' $spec_path
sed -i "s/\(.*Transpiler.*\).__init__ (ArgSpec(args=\['self'].*/\1.__init__ /g" $spec_path sed -i "s/\(.*Transpiler.*\).__init__ (ArgSpec(args=\['self'].*/\1.__init__ /g" $spec_path
...@@ -1244,21 +1267,25 @@ EOF ...@@ -1244,21 +1267,25 @@ EOF
ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl
ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl
ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl
ref_paddle38=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp38-cp38-linux_x86_64.whl
ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp27-cp27mu-linux_x86_64.whl ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp27-cp27mu-linux_x86_64.whl
ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl
ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl
ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl
ref_paddle38_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp38-cp38-linux_x86_64.whl
if [[ ${PADDLE_BRANCH} != "0.0.0" && ${WITH_MKL} == "ON" && ${WITH_GPU} == "ON" ]]; then if [[ ${PADDLE_BRANCH} != "0.0.0" && ${WITH_MKL} == "ON" && ${WITH_GPU} == "ON" ]]; then
ref_paddle2=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl ref_paddle2=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl
ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl
ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl
ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl
ref_paddle38=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp38-cp38-linux_x86_64.whl
ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl
ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl
ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl
ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl
ref_paddle38_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp38-cp38-linux_x86_64.whl
fi fi
#ref_paddle2_mv1="" #ref_paddle2_mv1=""
...@@ -1363,6 +1390,22 @@ EOF ...@@ -1363,6 +1390,22 @@ EOF
apt-get clean -y && \ apt-get clean -y && \
rm -f ${ref_paddle37} && \ rm -f ${ref_paddle37} && \
ldconfig ldconfig
EOF
cat >> ${PADDLE_ROOT}/build/Dockerfile <<EOF
# run paddle version to install python packages first
RUN apt-get update && ${NCCL_DEPS}
RUN apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev \
libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev \
xz-utils tk-dev libffi-dev liblzma-dev
RUN wget -q https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xzf Python-3.8.0.tgz && cd Python-3.8.0 && \
CFLAGS="-Wformat" ./configure --prefix=/usr/local/ --enable-shared > /dev/null && \
make -j8 > /dev/null && make altinstall > /dev/null && cd ../ && rm Python-3.8.0.tgz
RUN apt-get install -y libgtk2.0-dev dmidecode python3-tk && ldconfig && \
pip3.8 install opencv-python && wget ${ref_web}/${ref_paddle38} && pip3.8 install ${ref_paddle38_whl}; apt-get install -f -y && \
apt-get clean -y && \
rm -f ${ref_paddle38} && \
ldconfig
EOF EOF
cat >> ${PADDLE_ROOT}/build/Dockerfile <<EOF cat >> ${PADDLE_ROOT}/build/Dockerfile <<EOF
# run paddle version to install python packages first # run paddle version to install python packages first
......
...@@ -39,6 +39,7 @@ server_num = fleet.server_num ...@@ -39,6 +39,7 @@ server_num = fleet.server_num
server_index = fleet.server_index server_index = fleet.server_index
server_endpoints = fleet.server_endpoints server_endpoints = fleet.server_endpoints
is_server = fleet.is_server is_server = fleet.is_server
set_util = fleet.set_util
util = fleet.util util = fleet.util
barrier_worker = fleet.barrier_worker barrier_worker = fleet.barrier_worker
init_worker = fleet.init_worker init_worker = fleet.init_worker
......
...@@ -180,6 +180,8 @@ class Fleet(object): ...@@ -180,6 +180,8 @@ class Fleet(object):
raise ValueError( raise ValueError(
"`role_maker` should be subclass of `RoleMakerBase`, but got {}". "`role_maker` should be subclass of `RoleMakerBase`, but got {}".
format(type(role_maker))) format(type(role_maker)))
self._role_maker.generate_role()
self.strategy_compiler = StrategyCompiler() self.strategy_compiler = StrategyCompiler()
if paddle.fluid.framework.in_dygraph_mode(): if paddle.fluid.framework.in_dygraph_mode():
if parallel_helper._is_parallel_ctx_initialized(): if parallel_helper._is_parallel_ctx_initialized():
...@@ -187,7 +189,6 @@ class Fleet(object): ...@@ -187,7 +189,6 @@ class Fleet(object):
"The dygraph parallel environment has been initialized.") "The dygraph parallel environment has been initialized.")
else: else:
paddle.distributed.init_parallel_env() paddle.distributed.init_parallel_env()
return None
def is_first_worker(self): def is_first_worker(self):
""" """
...@@ -275,13 +276,10 @@ class Fleet(object): ...@@ -275,13 +276,10 @@ class Fleet(object):
fleet.worker_endpoints() fleet.worker_endpoints()
""" """
'''
if to_string: if to_string:
return ",".join(self._role_maker.get_trainer_endpoints()) return ",".join(self._role_maker.get_trainer_endpoints())
else: else:
return self._role_maker.get_trainer_endpoints() return self._role_maker.get_trainer_endpoints()
'''
return ["127.0.0.1:1001", "127.0.0.1:1002"]
def server_num(self): def server_num(self):
""" """
...@@ -355,7 +353,9 @@ class Fleet(object): ...@@ -355,7 +353,9 @@ class Fleet(object):
return self._role_maker.is_server( return self._role_maker.is_server(
) or self._role_maker._is_heter_worker() ) or self._role_maker._is_heter_worker()
@property def set_util(self, util):
self._util = util
def util(self): def util(self):
""" """
Utility functions that can be used under certain runtime Utility functions that can be used under certain runtime
...@@ -376,16 +376,6 @@ class Fleet(object): ...@@ -376,16 +376,6 @@ class Fleet(object):
""" """
return self._util return self._util
@util.setter
def util(self, util):
"""
Set Utility functions for userd-defined runtime
Returns:
None
"""
self._util = util
def barrier_worker(self): def barrier_worker(self):
""" """
barrier all workers barrier all workers
...@@ -393,7 +383,7 @@ class Fleet(object): ...@@ -393,7 +383,7 @@ class Fleet(object):
Returns: Returns:
None None
""" """
self._role_maker.barrier_worker() self._role_maker._barrier("worker")
@is_non_distributed_check @is_non_distributed_check
@inited_runtime_handler @inited_runtime_handler
......
...@@ -13,18 +13,332 @@ ...@@ -13,18 +13,332 @@
# limitations under the License. # limitations under the License.
"""Defination of Role Makers.""" """Defination of Role Makers."""
import os import os
import time
import numpy as np import numpy as np
import warnings import warnings
from multiprocessing import Process, Manager from multiprocessing import Process, Manager
import paddle.fluid as fluid
#__all__ = ['UserDefinedRoleMaker', 'PaddleCloudRoleMaker'] import paddle.fluid as fluid
class Role: class Role:
WORKER = 1 WORKER = 1
SERVER = 2 SERVER = 2
HETER_WORKER = 3 HETER_WORKER = 3
ALL = 4
class Gloo(object):
"""
Gloo is a universal class for barrier and collective communication
"""
class RENDEZVOUS:
HDFS = 1
FILE = 2
HTTP = 3
def __init__(self):
self._worker_comm = None
self._server_comm = None
self._nodes_comm = None
self._comm_world = ["worker", "server", "all"]
self._err_init = "gloo is not initialized, will not communicator with other nodes"
self._err_type = "gloo initialized error, please check arguments"
self._err_world = "argument error, comm_world must in {}".format(
self._comm_world)
self._is_initialized = False
self._init_timeout_seconds = 3600
self._run_timeout_seconds = 9999999
self._rendezvous = None
self._role = None
self._iface = None
self._role_id = -1
self._worker_num = -1
self._server_num = -1
self._need_init_all = False
def init(self,
rendezvous,
role,
role_id,
worker_num,
server_num,
need_init_all=False,
kwargs=None):
self._rendezvous = rendezvous
self._role = role
self._role_id = role_id
self._worker_num = worker_num
self._server_num = server_num
self._need_init_all = need_init_all
self._iface = self.__get_default_iface()
self._prefix = kwargs.get("store.prefix", "")
if self._rendezvous == Gloo.RENDEZVOUS.HDFS:
dfs_name = kwargs.get("dfs.name", "")
dfs_ugi = kwargs.get("dfs.ugi", "")
dfs_path = kwargs.get("dfs.path", "")
if not dfs_name or not dfs_ugi or not dfs_path:
raise ValueError(self._err_type)
self._init_dfs(dfs_name, dfs_ugi, dfs_path, self._prefix)
elif self._rendezvous == Gloo.RENDEZVOUS.FILE:
fs_path = kwargs.get("dfs.path", "")
if not fs_path:
raise ValueError(self._err_type)
self._init_fs(fs_path, self._prefix)
elif self._rendezvous == Gloo.RENDEZVOUS.HTTP:
ip = kwargs.get("http.host", "")
port = kwargs.get("http.port", "")
if not ip or not port:
raise ValueError(self._err_type)
self._init_http(ip, port, self._prefix)
else:
raise ValueError(self._err_type)
self._is_initialized = True
def _init_fs(self, fs_path, prefix):
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_hdfs_store(os.path.join(fs_path, role), "", "")
gloo.init()
return gloo
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _init_dfs(self, dfs_name, dfs_ugi, dfs_path, prefix):
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_hdfs_store(os.path.join(dfs_path, role), dfs_name, dfs_ugi)
gloo.init()
return gloo
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _init_http(self, ip, port, prefix):
def __start_kv_server(http_server_d, size_d):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(port, size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
time.sleep(wait_seconds)
http_server.stop()
def init_kv_server():
size_d = {
"trainer": self._worker_num,
"pserver": self._server_num,
"all": self._worker_num + self._server_num
}
_http_server_d = {"running": True}
# child process for http server
_http_server = Process(
target=__start_kv_server, args=(_http_server_d, size_d))
_http_server.daemon = True
# set running status to True
# start child process
_http_server.start()
def init(rank, nodes, role):
gloo = fluid.core.Gloo()
gloo.set_rank(rank)
gloo.set_size(nodes)
gloo.set_prefix(prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
gloo.set_http_store(ip, port, role)
return gloo
port = int(port)
if self._role == Role.SERVER and self._role_id == 0:
init_kv_server()
if self._role == Role.WORKER:
rank, nodes = self._get_rank_nodes(Role.WORKER)
gloo = init(rank, nodes, "WORKER")
self._worker_comm = gloo
else:
rank, nodes = self._get_rank_nodes(Role.SERVER)
gloo = init(rank, nodes, "SERVER")
self._server_comm = gloo
if self._need_init_all:
rank, nodes = self._get_rank_nodes(Role.ALL)
gloo = init(rank, nodes, "ALL")
self._nodes_comm = gloo
def _get_rank_nodes(self, role):
nodes = 0
rank = -1
if role == Role.WORKER:
nodes = self._worker_num
rank = self._role_id
elif role == Role.SERVER:
nodes = self._server_num
rank = self._role_id
elif role == Role.ALL:
nodes = self._worker_num + self._server_num
if self._role == Role.WORKER:
rank = self._role_id
else:
rank = self._worker_num + self._role_id
else:
ValueError(self._err_type)
return rank, nodes
def __get_default_iface(self):
"""
get default physical interface
"""
default1 = self.__get_default_iface_from_gateway()
default2 = self.__get_default_iface_from_interfaces()
return default2 if default1 == "lo" else default1
def __get_default_iface_from_gateway(self):
"""
get default physical interface
"""
import netifaces
gateways = netifaces.gateways()
if gateways.get(netifaces.AF_INET) != None:
gateway = gateways[netifaces.AF_INET]
if len(gateway) > 0 and len(gateway[0]) > 1:
return gateway[0][1]
return "lo"
def __get_default_iface_from_interfaces(self):
"""
get default physical interface
"""
import netifaces
for intf_name in netifaces.interfaces():
addresses = netifaces.ifaddresses(intf_name)
if netifaces.AF_INET in addresses:
ipv4_addresses = addresses[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
if 'broadcast' in ipv4_address:
return intf_name
return "lo"
def barrier(self, comm_world):
"""
dummy barrier, do nothing
"""
if not self._is_initialized:
warnings.warn(self._err_init)
return
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
if comm_world == "worker":
self._worker_comm.barrier()
elif comm_world == "server":
self._server_comm.barrier()
else:
self._nodes_comm.barrier()
def all_reduce(self, input, mode="sum", comm_world="worker"):
if not self._is_initialized:
warnings.warn(self._err_init)
return input
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
input = np.array(input)
input_shape = input.shape
input_list = input.reshape(-1).tolist()
self.barrier(comm_world)
if comm_world == "worker":
ans = self._worker_comm.all_reduce(input_list, mode)
elif comm_world == "server":
ans = self._server_comm.all_reduce(input_list, mode)
else:
ans = self._nodes_comm.all_reduce(input_list, mode)
output = np.array(ans).reshape(input_shape)
return output
def all_gather(self, input, comm_world="worker"):
"""
dummy all gather, do nothing
Args:
obj(any): obj to do all gather
"""
if not self._is_initialized:
warnings.warn(self._err_init)
return input
if comm_world not in self._comm_world:
raise ValueError(self._err_world)
if comm_world == "worker":
output = self._worker_comm.all_gather(input)
elif comm_world == "server":
output = self._server_comm.all_gather(input)
else:
output = self._nodes_comm.all_gather(input)
return output
class RoleMakerBase(object): class RoleMakerBase(object):
...@@ -47,10 +361,6 @@ class RoleMakerBase(object): ...@@ -47,10 +361,6 @@ class RoleMakerBase(object):
self._heter_trainer_device = "CPU" self._heter_trainer_device = "CPU"
self._is_heter_parameter_server_mode = False self._is_heter_parameter_server_mode = False
self._node_type = None
self._node_type_comm = None
self._all_comm = None
def is_worker(self): def is_worker(self):
""" """
return is_worker() of current process return is_worker() of current process
...@@ -142,19 +452,11 @@ class RoleMakerBase(object): ...@@ -142,19 +452,11 @@ class RoleMakerBase(object):
self._role, self._current_id, self._worker_endpoints, self._role, self._current_id, self._worker_endpoints,
self._server_endpoints) self._server_endpoints)
def _all_gather(self, comm_world, input): def _all_gather(self, input, comm_world="worker"):
""" print("warning: RoleMakerBase does not have all gather worker.")
Args:
input(int|float): input value
Returns:
return a list of values
"""
print("warning: RoleMakerBase does not have all gather.")
return None return None
def _all_reduce(self, comm_world, input, mode="sum"): def _all_reduce(self, input, mode="sum", comm_world="worker"):
""" """
Args: Args:
input(list/numpy.array): array of one dim input(list/numpy.array): array of one dim
...@@ -221,73 +523,25 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -221,73 +523,25 @@ class PaddleCloudRoleMaker(RoleMakerBase):
def __init__(self, is_collective=False, **kwargs): def __init__(self, is_collective=False, **kwargs):
super(PaddleCloudRoleMaker, self).__init__() super(PaddleCloudRoleMaker, self).__init__()
self._is_collective = is_collective self._is_collective = is_collective
self._init_gloo = False # default no init gloo
self._kwargs = kwargs
self._non_distributed = False
self._kwargs = kwargs
self._role_is_generated = False self._role_is_generated = False
self._server_endpoints = None self._server_endpoints = None
self._worker_endpoints = None self._worker_endpoints = None
self._node_type_comm = None self._gloo = Gloo() # gloo instance
self._all_comm = None
self._non_distributed = False
if not self._is_collective:
self._hdfs_name = kwargs.get("hdfs_name", "")
self._hdfs_ugi = kwargs.get("hdfs_ugi", "")
self._hdfs_path = kwargs.get("path", "").rstrip("/")
self._init_timeout_seconds = kwargs.get("init_timeout_seconds",
3600)
self._run_timeout_seconds = kwargs.get("run_timeout_seconds",
9999999)
ip_port = kwargs.get("http_ip_port", "")
self._http_ip_port = []
self._http_server = None
# if ip_port is not empty, it will use http instead of hdfs
if ip_port != "":
self._http_ip_port = ip_port.split(":")
# it's for communication between processes
self._manager = Manager()
# global dict to store status
self._http_server_d = self._manager.dict()
# set running status of http server
self._http_server_d["running"] = False
self._iface = self.__get_default_iface()
# this environment variable can be empty
self._prefix = os.getenv("SYS_JOB_ID", "")
def _barrier(self, comm_world): def _barrier(self, comm_world):
if isinstance(comm_world, fluid.core.Gloo): self._gloo.barrier(comm_world)
comm_world.barrier()
else:
print("warning: must init Gloo before using _barrier() function")
def _all_gather(self, comm_world, input):
if isinstance(comm_world, fluid.core.Gloo):
self._barrier(comm_world)
output = comm_world.all_gather(input)
return output
else:
print("warning: must init Gloo before using _all_gather() function")
return None
def _all_reduce(self, comm_world, input, mode="sum"):
if isinstance(comm_world, fluid.core.Gloo):
input = np.array(input)
input_shape = input.shape def _all_gather(self, input, comm_world="worker"):
input_list = input.reshape(-1).tolist() return self._gloo.all_gather(input, comm_world)
self._barrier(comm_world) def _all_reduce(self, input, mode="sum", comm_world="worker"):
ans = comm_world.all_reduce(input_list, mode) return self._gloo.all_reduce(input, mode, comm_world)
output = np.array(ans).reshape(input_shape)
return output
else:
print("warning: must init Gloo before using _all_reduce() function")
return None
def is_worker(self): def is_worker(self):
""" """
...@@ -349,7 +603,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -349,7 +603,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
""" """
if not self._role_is_generated: if not self._role_is_generated:
self.generate_role() self.generate_role()
return self._trainers_num return len(self.get_pserver_endpoints())
def node_num(self): def node_num(self):
""" """
...@@ -421,8 +675,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -421,8 +675,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
# Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set
# format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002
self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS",
"").split(",")
if self._server_endpoints is None: if self._server_endpoints is None:
# back to non_distributed execution. # back to non_distributed execution.
self._server_endpoints = "" self._server_endpoints = ""
...@@ -436,6 +689,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -436,6 +689,13 @@ class PaddleCloudRoleMaker(RoleMakerBase):
return return
self._server_endpoints = self._server_endpoints.split(",") self._server_endpoints = self._server_endpoints.split(",")
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
if self._worker_endpoints:
self._worker_endpoints = self._worker_endpoints.split(",")
else:
self._worker_endpoints = []
trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"])
training_role = os.environ["TRAINING_ROLE"] training_role = os.environ["TRAINING_ROLE"]
...@@ -506,6 +766,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -506,6 +766,7 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER") self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
assert (self._training_role == "TRAINER") assert (self._training_role == "TRAINER")
self._role = Role.WORKER
self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
if self._worker_endpoints is None: if self._worker_endpoints is None:
...@@ -518,74 +779,64 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -518,74 +779,64 @@ class PaddleCloudRoleMaker(RoleMakerBase):
self._node_num = len( self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints])) set([x.split(':')[0] for x in self._worker_endpoints]))
def _init_gloo_env(self): def _gloo_init(self):
def init_gloo_instance(role="trainer"): # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier
role = role.lower() use_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0"))
assert role in ["trainer", "pserver", "all"] if use_gloo not in [1, 2]:
if role == "trainer": return
all_list = self._worker_endpoints
rank = self._current_id # PADDLE_GLOO_RENDEZVOUS 1: HDFS 2: FILE 3: HTTP
elif role == "pserver": rendezvous_type = int(os.getenv("PADDLE_GLOO_RENDEZVOUS", "0"))
all_list = self._server_endpoints prefix = os.getenv("SYS_JOB_ID", "")
rank = self._current_id if rendezvous_type not in [
else: Gloo.RENDEZVOUS.HDFS, Gloo.RENDEZVOUS.HTTP, Gloo.RENDEZVOUS.FILE
all_list = self._worker_endpoints + self._server_endpoints ]:
rank = all_list.index(self._cur_endpoint) raise ValueError(self._gloo._err_type)
gloo = fluid.core.Gloo()
gloo.set_rank(rank) need_init_all = True if use_gloo == 2 else False
gloo.set_size(len(all_list))
gloo.set_prefix(self._prefix) if rendezvous_type == Gloo.RENDEZVOUS.HDFS:
gloo.set_iface(self._iface) dfs_name = os.getenv("PADDLE_GLOO_FS_NAME", "")
gloo.set_timeout_seconds(self._init_timeout_seconds, dfs_ugi = os.getenv("PADDLE_GLOO_FS_UGI", "")
self._run_timeout_seconds) dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "")
if len(self._http_ip_port) != 0: kwargs = {
gloo.set_http_store(self._http_ip_port[0], "dfs.name": dfs_name,
int(self._http_ip_port[1]), role) "dfs.ugi": dfs_ugi,
else: "dfs.path": dfs_path,
gloo.set_hdfs_store(self._hdfs_path + "/" + role, "store.prefix": prefix,
self._hdfs_name, self._hdfs_ugi) }
gloo.init() elif rendezvous_type == Gloo.RENDEZVOUS.HTTP:
return gloo ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "")
port = os.getenv("PADDLE_GLOO_HTTP_PORT", "")
# paddlecloud support gloo kwargs = {
if self._role == Role.WORKER: "http.host": ip,
if self._current_id == 0 and len(self._http_ip_port) != 0: "http.port": port,
size_d = { "store.prefix": prefix,
"trainer": len(self._worker_endpoints), }
"pserver": len(self._server_endpoints),
"all":
len(self._worker_endpoints) + len(self._server_endpoints)
}
# child process for http server
self._http_server = Process(
target=self.__start_kv_server,
args=(self._http_server_d, size_d))
self._http_server.daemon = True
# set running status to True
self._http_server_d["running"] = True
# start child process
self._http_server.start()
self._node_type = 1
gloo = init_gloo_instance("trainer")
self._node_type_comm = gloo
else: else:
assert self._role == Role.SERVER dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "")
self._node_type = 0 kwargs = {
gloo = init_gloo_instance("pserver") "dfs.path": dfs_path,
self._node_type_comm = gloo "store.prefix": prefix,
}
all_list = self._worker_endpoints + self._server_endpoints
self._rank = all_list.index(self._cur_endpoint) if rendezvous_type == Gloo.RENDEZVOUS.HDFS:
self._size = len(all_list) type = "HDFS"
elif rendezvous_type == Gloo.RENDEZVOUS.HTTP:
gloo = init_gloo_instance("all") type = "HTTP"
self._all_comm = gloo else:
type = "FILE"
if self._http_server is not None: print("Gloo init with {}: need_init_all: {}, args: {}".format(
# set running status to False type, need_init_all, kwargs))
self._http_server_d["running"] = False
# wait until child process exits self._gloo.init(
self._http_server.join() rendezvous=rendezvous_type,
role=self._role,
role_id=self.role_id(),
worker_num=self.worker_num(),
server_num=self.server_num(),
need_init_all=need_init_all,
kwargs=kwargs)
def generate_role(self): def generate_role(self):
""" """
...@@ -594,57 +845,10 @@ class PaddleCloudRoleMaker(RoleMakerBase): ...@@ -594,57 +845,10 @@ class PaddleCloudRoleMaker(RoleMakerBase):
if not self._role_is_generated: if not self._role_is_generated:
if not self._is_collective: if not self._is_collective:
self._ps_env() self._ps_env()
if "PADDLE_WITH_GLOO" in os.environ:
self._init_gloo = bool(os.environ["PADDLE_WITH_GLOO"])
if self._init_gloo:
self._init_gloo_env()
else: else:
self._collective_env() self._collective_env()
self._role_is_generated = True self._role_is_generated = True
self._gloo_init()
def __get_default_iface(self):
"""
get default physical interface
"""
default1 = self.__get_default_iface_from_gateway()
default2 = self.__get_default_iface_from_interfaces()
return default2 if default1 == "lo" else default1
def __get_default_iface_from_gateway(self):
"""
get default physical interface
"""
import netifaces
gateways = netifaces.gateways()
if gateways.get(netifaces.AF_INET) != None:
gateway = gateways[netifaces.AF_INET]
if len(gateway) > 0 and len(gateway[0]) > 1:
return gateway[0][1]
return "lo"
def __get_default_iface_from_interfaces(self):
"""
get default physical interface
"""
import netifaces
for intf_name in netifaces.interfaces():
addresses = netifaces.ifaddresses(intf_name)
if netifaces.AF_INET in addresses:
ipv4_addresses = addresses[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
if 'broadcast' in ipv4_address:
return intf_name
return "lo"
def __start_kv_server(self, http_server_d, size_d):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
time.sleep(wait_seconds)
http_server.stop()
class UserDefinedRoleMaker(PaddleCloudRoleMaker): class UserDefinedRoleMaker(PaddleCloudRoleMaker):
...@@ -677,7 +881,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): ...@@ -677,7 +881,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
self._worker_endpoints = self._kwargs.get("worker_endpoints") self._worker_endpoints = self._kwargs.get("worker_endpoints")
self._current_id = self._kwargs.get("current_id") self._current_id = self._kwargs.get("current_id")
self._trainers_num = len(self._worker_endpoints) self._trainers_num = len(self._worker_endpoints)
self._training_role = Role.Worker self._training_role = Role.WORKER
self._node_num = len( self._node_num = len(
set([x.split(':')[0] for x in self._worker_endpoints])) set([x.split(':')[0] for x in self._worker_endpoints]))
...@@ -688,8 +892,6 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): ...@@ -688,8 +892,6 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker):
if not self._role_is_generated: if not self._role_is_generated:
if not self._is_collective: if not self._is_collective:
self._user_defined_ps_env() self._user_defined_ps_env()
if self._init_gloo:
self._init_gloo_env()
else: else:
self._user_defined_collective_env() self._user_defined_collective_env()
self._role_is_generated = True self._role_is_generated = True
...@@ -57,34 +57,7 @@ class UtilBase(object): ...@@ -57,34 +57,7 @@ class UtilBase(object):
), "fs_client must be the instance of paddle.distributed.fleet.utils.FS" ), "fs_client must be the instance of paddle.distributed.fleet.utils.FS"
self.fs_client = fs_client self.fs_client = fs_client
def __check_comm_world(self, comm_world="worker"): def all_reduce(self, input, mode="sum", comm_world="worker"):
if not self.role_maker._role_is_generated:
self.role_maker.generate_role()
_comm_world = None
comm_world_upper = comm_world.upper()
if comm_world_upper == "WORKER":
if not self.role_maker.is_worker():
print(
"warning: current role is not worker in collective_func(comm_world=\"worker\")"
)
_comm_world = self.role_maker._node_type_comm
elif comm_world_upper == "SERVER":
if not self.role_maker.is_server():
print(
"warning: current role is not server in collective_func(comm_world=\"server\")"
)
_comm_world = self.role_maker._node_type_comm
elif comm_world_upper == "ALL":
_comm_world = self.role_maker._all_comm
else:
raise ValueError(
"not support comm_world, please choose one from [worker, server, all]"
)
return _comm_world
def all_reduce(self, input, mode, comm_world="worker"):
""" """
All reduce `input` between specified collection. This is a distributed API. All reduce `input` between specified collection. This is a distributed API.
...@@ -130,8 +103,7 @@ class UtilBase(object): ...@@ -130,8 +103,7 @@ class UtilBase(object):
if __name__ == "__main__": if __name__ == "__main__":
train() train()
""" """
_comm_world = self.__check_comm_world(comm_world) return self.role_maker._all_reduce(input, mode, comm_world)
return self.role_maker._all_reduce(_comm_world, input, mode)
def barrier(self, comm_world="worker"): def barrier(self, comm_world="worker"):
""" """
...@@ -170,8 +142,7 @@ class UtilBase(object): ...@@ -170,8 +142,7 @@ class UtilBase(object):
if __name__ == "__main__": if __name__ == "__main__":
train() train()
""" """
_comm_world = self.__check_comm_world(comm_world) self.role_maker._barrier(comm_world)
self.role_maker._barrier(_comm_world)
def all_gather(self, input, comm_world="worker"): def all_gather(self, input, comm_world="worker"):
""" """
...@@ -219,8 +190,8 @@ class UtilBase(object): ...@@ -219,8 +190,8 @@ class UtilBase(object):
if __name__ == "__main__": if __name__ == "__main__":
train() train()
""" """
_comm_world = self.__check_comm_world(comm_world)
return self.role_maker._all_gather(_comm_world, input) return self.role_maker._all_gather(input, comm_world)
def _broadcast(self): def _broadcast(self):
pass pass
......
...@@ -55,7 +55,10 @@ launch a process on each of the given gpu card or cpu machine. ...@@ -55,7 +55,10 @@ launch a process on each of the given gpu card or cpu machine.
""" """
from __future__ import print_function from __future__ import print_function
import shutil
import sys import sys
import tempfile
from sys import version from sys import version
import subprocess import subprocess
import os import os
...@@ -213,12 +216,20 @@ def launch_collective(args): ...@@ -213,12 +216,20 @@ def launch_collective(args):
cluster, pod = get_cluster_from_args(args, gpus) cluster, pod = get_cluster_from_args(args, gpus)
logger.debug("get cluster from args:{}".format(cluster)) logger.debug("get cluster from args:{}".format(cluster))
global_envs = copy.copy(os.environ.copy())
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
global_envs["PADDLE_WITH_GLOO"] = "1"
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "2"
global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
procs = start_local_trainers( procs = start_local_trainers(
cluster, cluster,
pod, pod,
training_script=args.training_script, training_script=args.training_script,
training_script_args=args.training_script_args, training_script_args=args.training_script_args,
log_dir=args.log_dir) log_dir=args.log_dir,
envs=global_envs)
while True: while True:
alive = watch_local_trainers(procs, cluster.trainers_nranks()) alive = watch_local_trainers(procs, cluster.trainers_nranks())
...@@ -230,6 +241,9 @@ def launch_collective(args): ...@@ -230,6 +241,9 @@ def launch_collective(args):
time.sleep(3) time.sleep(3)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch_ps(args): def launch_ps(args):
ports = None ports = None
...@@ -315,6 +329,13 @@ def launch_ps(args): ...@@ -315,6 +329,13 @@ def launch_ps(args):
default_env = os.environ.copy() default_env = os.environ.copy()
current_env = copy.copy(default_env) current_env = copy.copy(default_env)
gloo_rendezvous_dir = tempfile.mkdtemp()
# add gloo env
current_env["PADDLE_WITH_GLOO"] = "1"
current_env["PADDLE_GLOO_RENDEZVOUS"] = "2"
current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir
current_env.pop("http_proxy", None) current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None) current_env.pop("https_proxy", None)
procs = [] procs = []
...@@ -419,6 +440,9 @@ def launch_ps(args): ...@@ -419,6 +440,9 @@ def launch_ps(args):
procs[i].proc.terminate() procs[i].proc.terminate()
print("all parameter server are killed", file=sys.stderr) print("all parameter server are killed", file=sys.stderr)
if os.path.exists(gloo_rendezvous_dir):
shutil.rmtree(gloo_rendezvous_dir)
def launch(): def launch():
args = _parse_args() args = _parse_args()
......
...@@ -398,8 +398,14 @@ def start_local_trainers(cluster, ...@@ -398,8 +398,14 @@ def start_local_trainers(cluster,
pod, pod,
training_script, training_script,
training_script_args, training_script_args,
log_dir=None): log_dir=None,
current_env = copy.copy(os.environ.copy()) envs=None):
if envs is None:
current_env = copy.copy(os.environ.copy())
else:
current_env = copy.copy(envs)
#paddle broadcast ncclUniqueId use socket, and #paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them. #proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri" #if we set them to "", grpc will log error message "bad uri"
......
...@@ -27,7 +27,7 @@ class TestFleetBase(unittest.TestCase): ...@@ -27,7 +27,7 @@ class TestFleetBase(unittest.TestCase):
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001" "127.0.0.1:36001,127.0.0.2:36001"
def test_init(self): def test_init(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
...@@ -88,7 +88,7 @@ class TestFleetBase(unittest.TestCase): ...@@ -88,7 +88,7 @@ class TestFleetBase(unittest.TestCase):
def test_util(self): def test_util(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
self.assertEqual(fleet.util, None) self.assertEqual(fleet.util(), None)
def test_barrier_worker(self): def test_barrier_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
...@@ -99,20 +99,17 @@ class TestFleetBase(unittest.TestCase): ...@@ -99,20 +99,17 @@ class TestFleetBase(unittest.TestCase):
def test_init_worker(self): def test_init_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
if fleet.is_worker():
fleet.init_worker()
def test_run_server(self): with self.assertRaises(ValueError):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) if fleet.is_worker():
fleet.init(role) fleet.init_worker()
if fleet.is_worker():
fleet.run_worker()
def test_stop_worker(self): def test_stop_worker(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
if fleet.is_worker(): with self.assertRaises(ValueError):
fleet.stop_worker() if fleet.is_worker():
fleet.stop_worker()
def test_distributed_optimizer(self): def test_distributed_optimizer(self):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
......
...@@ -15,7 +15,11 @@ ...@@ -15,7 +15,11 @@
from __future__ import print_function from __future__ import print_function
import os import os
import platform
import shutil
import tempfile
import unittest import unittest
import paddle
import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
...@@ -42,9 +46,9 @@ class TestRoleMakerBase(unittest.TestCase): ...@@ -42,9 +46,9 @@ class TestRoleMakerBase(unittest.TestCase):
self.assertTrue(len(pserver_endpoints) == 0) self.assertTrue(len(pserver_endpoints) == 0)
print(role.to_string()) print(role.to_string())
self.assertTrue(role._all_gather(role._node_type_comm, 1) is None) self.assertTrue(role._all_gather(1, "worker") is None)
self.assertTrue(role._all_reduce(role._node_type_comm, 1) is None) self.assertTrue(role._all_reduce(1, "sum", "worker") is None)
role._barrier(role._node_type_comm) role._barrier("worker")
class TestCloudRoleMaker(unittest.TestCase): class TestCloudRoleMaker(unittest.TestCase):
...@@ -72,8 +76,8 @@ class TestCloudRoleMaker(unittest.TestCase): ...@@ -72,8 +76,8 @@ class TestCloudRoleMaker(unittest.TestCase):
print("warning: no netifaces, skip test_tr_rolemaker") print("warning: no netifaces, skip test_tr_rolemaker")
return return
ro = role_maker.PaddleCloudRoleMaker( ro = role_maker.PaddleCloudRoleMaker(is_collective=False)
is_collective=False, init_gloo=False)
self.assertTrue(ro.is_worker()) self.assertTrue(ro.is_worker())
self.assertFalse(ro.is_server()) self.assertFalse(ro.is_server())
self.assertEqual(ro.worker_num(), 2) self.assertEqual(ro.worker_num(), 2)
...@@ -108,8 +112,9 @@ class TestCloudRoleMaker(unittest.TestCase): ...@@ -108,8 +112,9 @@ class TestCloudRoleMaker(unittest.TestCase):
self.assertEqual(ro.server_num(), 2) self.assertEqual(ro.server_num(), 2)
pserver_endpoints = ro.get_pserver_endpoints() pserver_endpoints = ro.get_pserver_endpoints()
self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001') self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001')
self.assertTrue(ro._all_gather(ro._all_comm, 1) is None)
self.assertTrue(ro._all_reduce(ro._all_comm, 1) is None) self.assertEqual(ro._all_gather(1, "worker"), 1)
self.assertEqual(ro._all_reduce(1, "sum", "worker"), 1)
def test_traing_role(self): def test_traing_role(self):
"""Test training role.""" """Test training role."""
...@@ -142,7 +147,7 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ...@@ -142,7 +147,7 @@ class TestUserDefinedRoleMaker(unittest.TestCase):
ro = role_maker.UserDefinedRoleMaker( ro = role_maker.UserDefinedRoleMaker(
is_collective=False, is_collective=False,
init_gloo=False, init_gloo=False,
server_endpoints="127.0.0.1:36001,127.0.0.1:36001", server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"],
role=role_maker.Role.SERVER, role=role_maker.Role.SERVER,
current_id=0, current_id=0,
worker_num=2) worker_num=2)
...@@ -161,14 +166,274 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ...@@ -161,14 +166,274 @@ class TestUserDefinedRoleMaker(unittest.TestCase):
ro = role_maker.UserDefinedRoleMaker( ro = role_maker.UserDefinedRoleMaker(
is_collective=False, is_collective=False,
init_gloo=False, init_gloo=False,
server_endpoints="127.0.0.1:36001,127.0.0.1:36001", server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"],
role=role_maker.Role.WORKER, role=role_maker.Role.WORKER,
current_id=0, current_id=0,
worker_num=2) worker_num=2)
self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints()) self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints())
self.assertTrue(ro.is_worker()) self.assertTrue(ro.is_worker())
self.assertEqual(ro.role_id(), 0) self.assertEqual(ro.role_id(), 0)
class TestGlooWithCloudRoleMaker(unittest.TestCase):
def setUp(self):
os.environ["PADDLE_TRAINERS_NUM"] = "1"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ID"] = "0"
def case(self, role, comm_world):
role._barrier(comm_world)
gather = role._all_gather(1, comm_world)
self.assertEqual(gather[0], 1)
all_reduce = role._all_reduce(1, "sum", comm_world)
self.assertEqual(1, all_reduce)
def mkdir(self):
tmp = tempfile.mkdtemp()
return tmp
def clean(self, tmp):
shutil.rmtree(tmp)
def test_hdfs_gloo(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "worker")
self.clean(tmp)
def test_fs_gloo(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "worker")
self.clean(tmp)
def test_fs_gloo2(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.clean(tmp)
def test_fs_gloo3(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.clean(tmp)
def test_fs_gloo4(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3"
os.environ["PADDLE_GLOO_HTTP_HOST"] = "127.0.0.1"
os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019"
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
import time
time.sleep(3)
def test_fs_gloo5(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.case(role, "all")
self.clean(tmp)
def test_fs_gloo6(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
role = role_maker.PaddleCloudRoleMaker()
role.generate_role()
self.case(role, "server")
self.case(role, "all")
self.clean(tmp)
def test_fs_gloo7(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "1"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5"
role = role_maker.PaddleCloudRoleMaker()
self.assertRaises(ValueError, role.generate_role)
def test_fs_gloo8(self):
plats = platform.platform()
if 'Linux' not in plats:
print("skip gloo UT on MacOS/Win")
return
tmp = self.mkdir()
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["PADDLE_TRAINERS_NUM"] = "0"
os.environ["SYS_JOB_ID"] = "gloo_for_cluster"
os.environ["PADDLE_WITH_GLOO"] = "2"
os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1"
os.environ["PADDLE_GLOO_FS_NAME"] = "NULL"
os.environ["PADDLE_GLOO_FS_UGI"] = "NULL"
os.environ["PADDLE_GLOO_FS_PATH"] = tmp
def net():
x = paddle.fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = paddle.fluid.layers.fc(input=x, size=1, act=None)
y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = paddle.fluid.layers.square_error_cost(
input=y_predict, label=y)
avg_cost = paddle.fluid.layers.mean(cost)
return avg_cost
from paddle.distributed import fleet
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
avg_cost = net()
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
optimizer = paddle.optimizer.SGD(0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
comm_world = "server"
fleet.util().barrier(comm_world)
gather = fleet.util().all_gather(1, comm_world)
self.assertEqual(gather[0], 1)
all_reduce = fleet.util().all_reduce(1, "sum", comm_world)
self.assertEqual(1, all_reduce)
self.clean(tmp)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -59,7 +59,7 @@ class TestFleetUtil(unittest.TestCase): ...@@ -59,7 +59,7 @@ class TestFleetUtil(unittest.TestCase):
import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
default_util = fleet.util default_util = fleet.util()
self.assertEqual(default_util, None) self.assertEqual(default_util, None)
def test_set_user_defined_util(self): def test_set_user_defined_util(self):
...@@ -76,8 +76,8 @@ class TestFleetUtil(unittest.TestCase): ...@@ -76,8 +76,8 @@ class TestFleetUtil(unittest.TestCase):
role = role_maker.PaddleCloudRoleMaker(is_collective=True) role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role) fleet.init(role)
my_util = UserDefinedUtil() my_util = UserDefinedUtil()
fleet.util = my_util fleet.set_util(my_util)
user_id = fleet.util.get_user_id() user_id = fleet.util().get_user_id()
self.assertEqual(user_id, 10) self.assertEqual(user_id, 10)
def test_fs(self): def test_fs(self):
...@@ -88,97 +88,6 @@ class TestFleetUtil(unittest.TestCase): ...@@ -88,97 +88,6 @@ class TestFleetUtil(unittest.TestCase):
self.assertFalse(fs.need_upload_download()) self.assertFalse(fs.need_upload_download())
fleet_util._set_file_system(fs) fleet_util._set_file_system(fs)
def test_barrier(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_barrier")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_barrier", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.SERVER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
fleet_util.barrier("worker")
def test_all_reduce(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_all_reduce")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.WORKER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
output = fleet_util.all_reduce(1, "sum", comm_world="server")
print(output)
# self.assertEqual(output, 1)
def test_all_gather(self):
try:
import netifaces
except:
print("warning: no netifaces, skip test_all_gather")
return
gloo = fluid.core.Gloo()
gloo.set_rank(0)
gloo.set_size(1)
gloo.set_prefix("123")
gloo.set_iface("lo")
gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "")
gloo.init()
role = role_maker.UserDefinedRoleMaker(
is_collective=False,
init_gloo=False,
current_id=0,
role=role_maker.Role.SERVER,
worker_endpoints=["127.0.0.1:6003"],
server_endpoints=["127.0.0.1:6001"])
role._node_type_comm = gloo
role._all_comm = gloo
role._role_is_generated = True
fleet_util._set_role_maker(role)
output = fleet_util.all_gather(1, comm_world="all")
print(output)
# self.assertTrue(len(output) == 1 and output[0] == 1)
self.assertRaises(Exception, fleet_util.all_gather, 1, "test")
def download_files(self): def download_files(self):
path = download(self.proto_data_url, self.module_name, path = download(self.proto_data_url, self.module_name,
self.proto_data_md5) self.proto_data_md5)
......
...@@ -474,6 +474,141 @@ class TestTransformer(unittest.TestCase): ...@@ -474,6 +474,141 @@ class TestTransformer(unittest.TestCase):
trans_output = transformer(src, tgt, src_mask, tgt_mask, trans_output = transformer(src, tgt, src_mask, tgt_mask,
memory_mask) memory_mask)
def test_transformer_attr_1(self):
batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params(
mode="decoder_layer")
# batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8
with fluid.dygraph.guard(fluid.CPUPlace()):
transformer = Transformer(
d_model,
n_head,
dim_feedforward=dim_feedforward,
dropout=dropout,
weight_attr=[None],
bias_attr=[False])
src = paddle.to_variable(
np.random.rand(batch_size, source_length, d_model).astype(
"float32"))
tgt = paddle.to_variable(
np.random.rand(batch_size, target_length, d_model).astype(
"float32"))
src_mask = np.zeros((batch_size, n_head, source_length,
source_length)).astype("float32")
src_mask[0][0][0][0] = -np.inf
src_mask = paddle.to_variable(src_mask)
tgt_mask = np.zeros((batch_size, n_head, target_length,
target_length)).astype("float32")
tgt_mask[0][0][0][0] = -1e9
memory_mask = np.zeros((batch_size, n_head, target_length,
source_length)).astype("float32")
memory_mask[0][0][0][0] = -1e9
tgt_mask, memory_mask = paddle.to_variable(
tgt_mask), paddle.to_variable(memory_mask)
trans_output = transformer(src, tgt, src_mask, tgt_mask,
memory_mask)
def test_transformer_attr_2(self):
batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params(
mode="decoder_layer")
# batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8
with fluid.dygraph.guard(fluid.CPUPlace()):
transformer = Transformer(
d_model,
n_head,
dim_feedforward=dim_feedforward,
dropout=dropout,
weight_attr=[None, None],
bias_attr=[False, False])
src = paddle.to_variable(
np.random.rand(batch_size, source_length, d_model).astype(
"float32"))
tgt = paddle.to_variable(
np.random.rand(batch_size, target_length, d_model).astype(
"float32"))
src_mask = np.zeros((batch_size, n_head, source_length,
source_length)).astype("float32")
src_mask[0][0][0][0] = -np.inf
src_mask = paddle.to_variable(src_mask)
tgt_mask = np.zeros((batch_size, n_head, target_length,
target_length)).astype("float32")
tgt_mask[0][0][0][0] = -1e9
memory_mask = np.zeros((batch_size, n_head, target_length,
source_length)).astype("float32")
memory_mask[0][0][0][0] = -1e9
tgt_mask, memory_mask = paddle.to_variable(
tgt_mask), paddle.to_variable(memory_mask)
trans_output = transformer(src, tgt, src_mask, tgt_mask,
memory_mask)
def test_transformer_attr_3(self):
batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params(
mode="decoder_layer")
# batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8
with fluid.dygraph.guard(fluid.CPUPlace()):
transformer = Transformer(
d_model,
n_head,
dim_feedforward=dim_feedforward,
dropout=dropout,
weight_attr=[None, None, None],
bias_attr=[False, False, True])
src = paddle.to_variable(
np.random.rand(batch_size, source_length, d_model).astype(
"float32"))
tgt = paddle.to_variable(
np.random.rand(batch_size, target_length, d_model).astype(
"float32"))
src_mask = np.zeros((batch_size, n_head, source_length,
source_length)).astype("float32")
src_mask[0][0][0][0] = -np.inf
src_mask = paddle.to_variable(src_mask)
tgt_mask = np.zeros((batch_size, n_head, target_length,
target_length)).astype("float32")
tgt_mask[0][0][0][0] = -1e9
memory_mask = np.zeros((batch_size, n_head, target_length,
source_length)).astype("float32")
memory_mask[0][0][0][0] = -1e9
tgt_mask, memory_mask = paddle.to_variable(
tgt_mask), paddle.to_variable(memory_mask)
trans_output = transformer(src, tgt, src_mask, tgt_mask,
memory_mask)
def test_transformer_attr_boolean(self):
batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params(
mode="decoder_layer")
# batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8
with fluid.dygraph.guard(fluid.CPUPlace()):
transformer = Transformer(
d_model,
n_head,
dim_feedforward=dim_feedforward,
dropout=dropout,
bias_attr=False)
src = paddle.to_variable(
np.random.rand(batch_size, source_length, d_model).astype(
"float32"))
tgt = paddle.to_variable(
np.random.rand(batch_size, target_length, d_model).astype(
"float32"))
src_mask = np.zeros((batch_size, n_head, source_length,
source_length)).astype("float32")
src_mask[0][0][0][0] = -np.inf
src_mask = paddle.to_variable(src_mask)
tgt_mask = np.zeros((batch_size, n_head, target_length,
target_length)).astype("float32")
tgt_mask[0][0][0][0] = -1e9
memory_mask = np.zeros((batch_size, n_head, target_length,
source_length)).astype("float32")
memory_mask[0][0][0][0] = -1e9
tgt_mask, memory_mask = paddle.to_variable(
tgt_mask), paddle.to_variable(memory_mask)
trans_output = transformer(src, tgt, src_mask, tgt_mask,
memory_mask)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -53,7 +53,22 @@ def _convert_param_attr_to_list(param_attr, n): ...@@ -53,7 +53,22 @@ def _convert_param_attr_to_list(param_attr, n):
if isinstance(param_attr, (list, tuple)): if isinstance(param_attr, (list, tuple)):
assert len(param_attr) == n, ( assert len(param_attr) == n, (
"length of param_attr should be %d when it is a list/tuple" % n) "length of param_attr should be %d when it is a list/tuple" % n)
param_attrs = [ParamAttr._to_attr(attr) for attr in param_attr] param_attrs = []
for attr in param_attr:
if isinstance(attr, bool):
if attr:
param_attrs.append(ParamAttr._to_attr(None))
else:
param_attrs.append(False)
else:
param_attrs.append(ParamAttr._to_attr(attr))
# param_attrs = [ParamAttr._to_attr(attr) for attr in param_attr]
elif isinstance(param_attr, bool):
param_attrs = []
if param_attr:
param_attrs = [ParamAttr._to_attr(None) for i in range(n)]
else:
param_attrs = [False] * n
else: else:
param_attrs = [] param_attrs = []
attr = ParamAttr._to_attr(param_attr) attr = ParamAttr._to_attr(param_attr)
...@@ -417,7 +432,7 @@ class TransformerEncoderLayer(Layer): ...@@ -417,7 +432,7 @@ class TransformerEncoderLayer(Layer):
Otherwise, MHA and FFN both use it as `weight_attr` to create parameters. Otherwise, MHA and FFN both use it as `weight_attr` to create parameters.
Default: None, which means the default weight parameter property is used. Default: None, which means the default weight parameter property is used.
See usage for details in :code:`ParamAttr` . See usage for details in :code:`ParamAttr` .
bias_attr (ParamAttr|tuple, optional): To specify the bias parameter property. bias_attr (ParamAttr|tuple|bool, optional): To specify the bias parameter property.
If it is a tuple, `bias_attr[0]` would be used as `bias_attr` for If it is a tuple, `bias_attr[0]` would be used as `bias_attr` for
MHA, and `bias_attr[1]` would be used as `bias_attr` for linear in FFN. MHA, and `bias_attr[1]` would be used as `bias_attr` for linear in FFN.
Otherwise, MHA and FFN both use it as `bias_attr` to create parameters. Otherwise, MHA and FFN both use it as `bias_attr` to create parameters.
...@@ -986,22 +1001,31 @@ class Transformer(Layer): ...@@ -986,22 +1001,31 @@ class Transformer(Layer):
Otherwise, no pre-process and post-precess includes dropout, residual Otherwise, no pre-process and post-precess includes dropout, residual
connection, layer normalization. Default False connection, layer normalization. Default False
weight_attr(ParamAttr|tuple, optional): To specify the weight parameter property. weight_attr(ParamAttr|tuple, optional): To specify the weight parameter property.
If it is a tuple, `weight_attr[0]` would be used as `weight_attr` for If it is a tuple, the length of `weight_attr` could be 1, 2 or 3. If it is 3,
self attention, `weight_attr[1]` would be used as `weight_attr` for `weight_attr[0]` would be used as `weight_attr` for self attention, `weight_attr[1]`
cross attention, and `weight_attr[2]` would be used as `weight_attr` would be used as `weight_attr` for cross attention of `TransformerDecoder`,
for linear in FFN. Otherwise, the three sub-layers all uses it as and `weight_attr[2]` would be used as `weight_attr` for linear in FFN.
`weight_attr` to create parameters. Default: None, which means the If it is 2, `weight_attr[0]` would be used as `weight_attr` both for self attention
default weight parameter property is used. See usage for details and cross attntion and `weight_attr[1]` would be used as `weight_attr` for
linear in FFN. If it is 1, `weight_attr[0]` would be used as `weight_attr`
for self attention, cross attention and linear in FFN. Otherwise,
the three sub-layers all uses it as `weight_attr` to create parameters.
Default: None, which means the default weight parameter property is used.
See usage for details
in :code:`ParamAttr` . in :code:`ParamAttr` .
bias_attr (ParamAttr|tuple, optional): To specify the bias parameter property. bias_attr (ParamAttr|tuple, optional): To specify the bias parameter property.
If it is a tuple, `bias_attr[0]` would be used as `bias_attr` for If it is a tuple, the length of `bias_attr` could be 1, 2 or 3. If it is 3,
self attention, `bias_attr[1]` would be used as `bias_attr` for `bias_attr[0]` would be used as `bias_attr` for self attention, `bias_attr[1]`
cross attention, and `bias_attr[2]` would be used as `bias_attr` would be used as `bias_attr` for cross attention of `TransformerDecoder`,
for linear in FFN. Otherwise, the three sub-layers all uses it as and `bias_attr[2]` would be used as `bias_attr` for linear in FFN.
`bias_attr` to create parameters. The `False` value means the If it is 2, `bias_attr[0]` would be used as `bias_attr` both for self attention
corresponding layer would not have trainable bias parameter. See and cross attntion and `bias_attr[1]` would be used as `bias_attr` for
usage for details in :code:`ParamAttr` . Default: None,which means linear in FFN. If it is 1, `bias_attr[0]` would be used as `bias_attr`
the default bias parameter property is used. for self attention, cross attention and linear in FFN. Otherwise,
the three sub-layers all uses it as `bias_attr` to create parameters.
The `False` value means the corresponding layer would not have trainable
bias parameter. See usage for details in :code:`ParamAttr` .
Default: None,which means the default bias parameter property is used.
custom_encoder (Layer): If custom encoder is provided, use it as the encoder. custom_encoder (Layer): If custom encoder is provided, use it as the encoder.
Default None Default None
custom_decoder (Layer): If custom decoder is provided, use it as the decoder. custom_decoder (Layer): If custom decoder is provided, use it as the decoder.
...@@ -1049,13 +1073,51 @@ class Transformer(Layer): ...@@ -1049,13 +1073,51 @@ class Transformer(Layer):
custom_decoder=None): custom_decoder=None):
super(Transformer, self).__init__() super(Transformer, self).__init__()
if isinstance(bias_attr, (list, tuple)):
if len(bias_attr) == 1:
encoder_bias_attr = [bias_attr[0]] * 2
decoder_bias_attr = [bias_attr[0]] * 3
elif len(bias_attr) == 2:
encoder_bias_attr = bias_attr
decoder_bias_attr = [bias_attr[0], bias_attr[0], bias_attr[-1]]
elif len(bias_attr) == 3:
encoder_bias_attr = [bias_attr[0], bias_attr[-1]]
decoder_bias_attr = bias_attr
else:
assert False, (
"length of bias_attr should be 1 or 2 or 3 when it is a list/tuple"
)
else:
encoder_bias_attr = bias_attr
decoder_bias_attr = bias_attr
if isinstance(weight_attr, (list, tuple)):
if len(weight_attr) == 1:
encoder_weight_attr = [weight_attr[0]] * 2
decoder_weight_attr = [weight_attr[0]] * 3
elif len(weight_attr) == 2:
encoder_weight_attr = weight_attr
decoder_weight_attr = [
weight_attr[0], weight_attr[0], weight_attr[-1]
]
elif len(weight_attr) == 3:
encoder_weight_attr = [weight_attr[0], weight_attr[-1]]
decoder_weight_attr = weight_attr
else:
assert False, (
"length of weight_attr should be 1 or 2 or 3 when it is a list/tuple"
)
else:
encoder_weight_attr = weight_attr
decoder_weight_attr = weight_attr
if custom_encoder is not None: if custom_encoder is not None:
self.encoder = custom_encoder self.encoder = custom_encoder
else: else:
encoder_layer = TransformerEncoderLayer( encoder_layer = TransformerEncoderLayer(
d_model, nhead, dim_feedforward, dropout, activation, d_model, nhead, dim_feedforward, dropout, activation,
attn_dropout, act_dropout, normalize_before, weight_attr, attn_dropout, act_dropout, normalize_before,
bias_attr) encoder_weight_attr, encoder_bias_attr)
encoder_norm = LayerNorm(d_model) encoder_norm = LayerNorm(d_model)
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers,
encoder_norm) encoder_norm)
...@@ -1065,8 +1127,8 @@ class Transformer(Layer): ...@@ -1065,8 +1127,8 @@ class Transformer(Layer):
else: else:
decoder_layer = TransformerDecoderLayer( decoder_layer = TransformerDecoderLayer(
d_model, nhead, dim_feedforward, dropout, activation, d_model, nhead, dim_feedforward, dropout, activation,
attn_dropout, act_dropout, normalize_before, weight_attr, attn_dropout, act_dropout, normalize_before,
bias_attr) decoder_weight_attr, decoder_bias_attr)
decoder_norm = LayerNorm(d_model) decoder_norm = LayerNorm(d_model)
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers,
decoder_norm) decoder_norm)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册