diff --git a/cmake/cuda.cmake b/cmake/cuda.cmake index 7a94bda0f5f73e48081f68d7b2730e3df1e46232..c78fe5f6c7fbd44e0820747f200e3e8168dc3783 100644 --- a/cmake/cuda.cmake +++ b/cmake/cuda.cmake @@ -107,6 +107,9 @@ function(select_nvcc_arch_flags out_variable) elseif(${CUDA_ARCH_NAME} STREQUAL "Maxwell") set(cuda_arch_bin "50") elseif(${CUDA_ARCH_NAME} STREQUAL "Pascal") + if (NOT ${CMAKE_CUDA_COMPILER_VERSION} LESS 10.0) + add_definitions("-DSUPPORTS_CUDA_FP16") + endif() set(cuda_arch_bin "60 61") elseif(${CUDA_ARCH_NAME} STREQUAL "Volta") if (NOT ${CMAKE_CUDA_COMPILER_VERSION} LESS 10.0) diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 96d54ec86917432837d61f681ece91da2ddcab10..aec27bd9d91e5afb6bf11037e60ff213162ad97f 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -527,6 +527,8 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { VLOG(0) << "error: the number of ids is a negative number: " << num; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " << all_slots_.size(); return false; } else if (num == 0) { VLOG(0) @@ -536,42 +538,66 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) { "characters."; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " << all_slots_.size(); return false; } else if (errno == ERANGE || num > INT_MAX) { VLOG(0) << "error: the number of ids greater than INT_MAX"; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " << all_slots_.size(); return false; } if (all_slots_type_[i] == "float") { - for (int i = 0; i < num; ++i) { + for (int j = 0; j < num; ++j) { strtof(endptr, &endptr); if (errno == ERANGE) { VLOG(0) << "error: the value is out of the range of " "representable values for float"; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " + << all_slots_.size(); + VLOG(0) << "and in this slot: " << j + << " th id with total id number: " << num; return false; } - if (i + 1 != num && endptr - str == len) { + if (j + 1 != num && endptr - str == len) { VLOG(0) << "error: there is a wrong with the number of ids."; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " + << all_slots_.size(); + VLOG(0) << "and in this slot: " << j + << " th id with total id number: " << num; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; return false; } } } else if (all_slots_type_[i] == "uint64") { - for (int i = 0; i < num; ++i) { + for (int j = 0; j < num; ++j) { strtoull(endptr, &endptr, 10); if (errno == ERANGE) { VLOG(0) << "error: the value is out of the range of " "representable values for uint64_t"; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " + << all_slots_.size(); + VLOG(0) << "and in this slot: " << j + << " th id with total id number: " << num; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; return false; } - if (i + 1 != num && endptr - str == len) { + if (j + 1 != num && endptr - str == len) { VLOG(0) << "error: there is a wrong with the number of ids."; + VLOG(0) << "Error occured when parsing " << i + << " th slot with total slots number: " + << all_slots_.size(); + VLOG(0) << "and in this slot: " << j + << " th id with total id number: " << num; VLOG(0) << "please check line<" << instance_cout << "> in file<" << filename << ">"; return false; @@ -632,8 +658,13 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe( "The number of ids can not be zero, you need padding " "it in data generator; or if there is something wrong with " "the data, please check if the data contains unresolvable " - "characters.\nplease check this error line: %s", - str)); + "characters.\nplease check this error line: %s, \n Specifically, " + "something wrong happened(the length of this slot's feasign is 0)" + "when we parse the %d th slots." + "Maybe something wrong around this slot", + "\nWe detect the feasign number of this slot is %d, " + "which is illegal.", + str, i, num)); if (idx != -1) { (*instance)[idx].Init(all_slots_type_[i]); if ((*instance)[idx].GetType()[0] == 'f') { // float @@ -683,8 +714,13 @@ bool MultiSlotDataFeed::ParseOneInstance(std::vector* instance) { "The number of ids can not be zero, you need padding " "it in data generator; or if there is something wrong with " "the data, please check if the data contains unresolvable " - "characters.\nplease check this error line: %s.", - str)); + "characters.\nplease check this error line: %s, \n Specifically, " + "something wrong happened(the length of this slot's feasign is 0)" + "when we parse the %d th slots." + "Maybe something wrong around this slot", + "\nWe detect the feasign number of this slot is %d, " + "which is illegal.", + str, i, num)); if (idx != -1) { (*instance)[idx].Init(all_slots_type_[i]); @@ -916,8 +952,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstanceFromPipe(Record* instance) { "The number of ids can not be zero, you need padding " "it in data generator; or if there is something wrong with " "the data, please check if the data contains unresolvable " - "characters.\nplease check this error line: %s.", - str)); + "characters.\nplease check this error line: %s, \n Specifically, " + "something wrong happened(the length of this slot's feasign is 0)" + "when we parse the %d th slots." + "Maybe something wrong around this slot", + "\nWe detect the feasign number of this slot is %d, " + "which is illegal.", + str, i, num)); if (idx != -1) { if (all_slots_type_[i][0] == 'f') { // float for (int j = 0; j < num; ++j) { @@ -982,8 +1023,13 @@ bool MultiSlotInMemoryDataFeed::ParseOneInstance(Record* instance) { "The number of ids can not be zero, you need padding " "it in data generator; or if there is something wrong with " "the data, please check if the data contains unresolvable " - "characters.\nplease check this error line: %s.", - str)); + "characters.\nplease check this error line: %s, \n Specifically, " + "something wrong happened(the length of this slot's feasign is 0)" + "when we parse the %d th slots." + "Maybe something wrong around this slot", + "\nWe detect the feasign number of this slot is %d, " + "which is illegal.", + str, i, num)); if (idx != -1) { if (all_slots_type_[i][0] == 'f') { // float diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index bb958f1ac015bfd1a71b3ccd530406a33e4e37cb..f195dde40843c8c4ee5168d11ad0b8eac8199f4e 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -19,6 +19,8 @@ limitations under the License. */ namespace gloo { namespace rendezvous { +constexpr int kNodeSize = 136; + HdfsStore::HdfsStore(const std::string& path) { path_ = path; wait_sleep_ms_ = 10000; @@ -213,12 +215,14 @@ void ParallelConnectContext::connectFullMesh( storeKey << rank; store.set(storeKey.str(), allBytes); + auto total_add_size = kNodeSize * (size - 1); + std::vector> connect_threads(thread_num_); // Connect every pair for (uint32_t i = 0; i < connect_threads.size(); ++i) { connect_threads[i].reset(new std::thread( - [&store, &transportContext, this](size_t thread_idx, - size_t thread_num) -> void { + [&store, &transportContext, total_add_size, this]( + size_t thread_idx, size_t thread_num) -> void { for (int i = thread_idx; i < size; i += thread_num) { if (i == rank) { continue; @@ -226,8 +230,23 @@ void ParallelConnectContext::connectFullMesh( // Wait for address of other side of this pair to become available std::string key = std::to_string(i); store.wait({key}, getTimeout()); + + std::vector allAddrs; + auto max_retry_times = 5; // Connect to other side of this pair - auto allAddrs = store.get(key); + + while (max_retry_times > 0) { + allAddrs = store.get(key); + + VLOG(3) << "store get all address size: " << allAddrs.size() + << " except: " << total_add_size; + if (allAddrs.size() == static_cast(total_add_size)) { + break; + } + + --max_retry_times; + } + auto addr = extractAddress(allAddrs, i); transportContext->getPair(i)->connect(addr); } diff --git a/paddle/fluid/inference/tensorrt/convert/emb_eltwise_layernorm.cc b/paddle/fluid/inference/tensorrt/convert/emb_eltwise_layernorm.cc index cdc0e415d46739c646cc2a26dfd6ec5333973b25..9fff558c583596215c191a31e95b4e9b2aad058b 100644 --- a/paddle/fluid/inference/tensorrt/convert/emb_eltwise_layernorm.cc +++ b/paddle/fluid/inference/tensorrt/convert/emb_eltwise_layernorm.cc @@ -80,10 +80,10 @@ class EmbEltwiseLayerNormOpConverter : public OpConverter { nvinfer1::ILayer* layer = nullptr; if (engine_->with_dynamic_shape()) { - plugin::DynamicPluginTensorRT* plugin = nullptr; - plugin = new plugin::EmbEltwiseLayernormPluginDynamic( + auto use_fp16 = engine_->WithFp16(); + auto plugin = new plugin::EmbEltwiseLayernormPluginDynamic( input_embs, bias, scale, emb_sizes, bias_size, scale_size, hidden, - eps); + eps, use_fp16); layer = engine_->AddPluginV2(input_ids.data(), input_num, plugin); } else { PADDLE_THROW(platform::errors::Fatal( diff --git a/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.cu b/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.cu index 5e43be90de3dbbfef3c7d3def7e37904bb644380..873631fea614cc18cdc2b2b2f27d2480aa71d50b 100644 --- a/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.cu +++ b/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.cu @@ -32,13 +32,34 @@ namespace plugin { #if IS_TRT_VERSION_GE(6000) template -int EmbEltwiseLayernormPluginDynamic::initialize() { +EmbEltwiseLayernormPluginDynamicImpl< + T>::~EmbEltwiseLayernormPluginDynamicImpl() { + this->terminate(); +} + +inline half fp32tofp16(float x) { return static_cast(x); } + +template +int EmbEltwiseLayernormPluginDynamicImpl::initialize() { embs_gpu_.resize(embs_.size()); for (int i = 0; i < embs_.size(); i++) { if (embs_[i]) { - cudaMalloc(&embs_gpu_[i], sizeof(float) * emb_sizes_[i]); - cudaMemcpy(embs_gpu_[i], embs_[i], emb_sizes_[i] * sizeof(float), + T *host_ptr; + auto size = emb_sizes_[i]; + + if (std::is_same::value) { + host_ptr = new T[size]; + std::transform(embs_[i], (embs_[i] + size), host_ptr, fp32tofp16); + } else { + host_ptr = reinterpret_cast(embs_[i]); + } + + cudaMalloc(&embs_gpu_[i], sizeof(T) * size); + cudaMemcpy(embs_gpu_[i], host_ptr, size * sizeof(T), cudaMemcpyHostToDevice); + if (std::is_same::value) { + delete[] host_ptr; + } } } @@ -53,11 +74,105 @@ int EmbEltwiseLayernormPluginDynamic::initialize() { cudaMemcpyHostToDevice); } + int input_num = embs_.size(); + in_ptr_tensor_.Resize({input_num}); + emb_ptr_tensor_.Resize({input_num}); + + cudaGetDevice(&device_id_); + auto emb_ptr_gpu_d = + emb_ptr_tensor_.mutable_data(platform::CUDAPlace(device_id_)); + cudaMemcpy(emb_ptr_gpu_d, embs_gpu_.data(), sizeof(uintptr_t) * input_num, + cudaMemcpyHostToDevice); + return 0; } template -nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic::getOutputDimensions( +void EmbEltwiseLayernormPluginDynamicImpl::terminate() { + for (int i = 0; i < embs_gpu_.size(); ++i) { + if (embs_gpu_[i]) { + cudaFree(embs_gpu_[i]); + embs_gpu_[i] = nullptr; + } + } + + if (bias_gpu_) { + cudaFree(bias_gpu_); + bias_gpu_ = nullptr; + } + + if (scale_gpu_) { + cudaFree(scale_gpu_); + scale_gpu_ = nullptr; + } +} + +template +int EmbEltwiseLayernormPluginDynamicImpl::enqueue( + const nvinfer1::PluginTensorDesc *input_desc, + const nvinfer1::PluginTensorDesc *output_desc, const void *const *inputs, + void *const *outputs, void *workspace, cudaStream_t stream) { + auto id_dims = input_desc[0].dims; + int batch = id_dims.d[0]; + int seq_len = id_dims.d[1]; + int input_num = embs_.size(); + + auto in_ptr_gpu_d = + in_ptr_tensor_.mutable_data(platform::CUDAPlace(device_id_)); + auto emb_ptr_gpu_d = + emb_ptr_tensor_.mutable_data(platform::CUDAPlace(device_id_)); + + auto new_input_ptr = reinterpret_cast(inputs[0]); + + if (old_input_ptr_ != new_input_ptr) { + old_input_ptr_ = new_input_ptr; + + cudaMemcpyAsync(in_ptr_gpu_d, reinterpret_cast(inputs), + sizeof(uintptr_t) * input_num, cudaMemcpyHostToDevice, + stream); + } + + auto out_type = output_desc[0].type; + + if (std::is_same::value) { + PADDLE_ENFORCE_EQ( + out_type == nvinfer1::DataType::kFLOAT, true, + platform::errors::InvalidArgument( + "The EmbEltwiseLayernorm Plugin only support fp32 input.")); + } else if (std::is_same::value) { + PADDLE_ENFORCE_EQ( + out_type == nvinfer1::DataType::kHALF, true, + platform::errors::InvalidArgument( + "The EmbEltwiseLayernorm Plugin only support fp16 input.")); + } else { + PADDLE_THROW(platform::errors::Fatal( + "Unsupport data type, the out type of EmbEltwiseLayernorm should be " + "float or half.")); + } + + auto *output_d = reinterpret_cast(outputs[0]); + + operators::math::EmbEltwiseLayerNormFunctor emb_eltwise_layernorm_func; + emb_eltwise_layernorm_func(batch, seq_len, hidden_size_, in_ptr_gpu_d, + scale_gpu_, bias_gpu_, emb_ptr_gpu_d, output_d, + eps_, input_num, stream); + return cudaGetLastError() != cudaSuccess; +} + +template class EmbEltwiseLayernormPluginDynamicImpl; +#ifdef SUPPORTS_CUDA_FP16 +template class EmbEltwiseLayernormPluginDynamicImpl; +#endif // SUPPORTS_CUDA_FP16 + +int EmbEltwiseLayernormPluginDynamic::initialize() { + impl_->initialize(); + + return 0; +} + +void EmbEltwiseLayernormPluginDynamic::terminate() { impl_->terminate(); } + +nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic::getOutputDimensions( int output_index, const nvinfer1::DimsExprs *inputs, int nb_inputs, nvinfer1::IExprBuilder &expr_builder) { // NOLINT PADDLE_ENFORCE_EQ(output_index, 0, @@ -76,18 +191,7 @@ nvinfer1::DimsExprs EmbEltwiseLayernormPluginDynamic::getOutputDimensions( return ret; } -template -void EmbEltwiseLayernormPluginDynamic::terminate() { - for (auto ptr : embs_gpu_) { - if (ptr) cudaFree(ptr); - } - - if (bias_gpu_) cudaFree(bias_gpu_); - if (scale_gpu_) cudaFree(scale_gpu_); -} - -template -bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination( +bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination( int pos, const nvinfer1::PluginTensorDesc *in_out, int nb_inputs, int nb_outputs) { PADDLE_ENFORCE_NOT_NULL( @@ -98,6 +202,11 @@ bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination( "The EmbEltwiseLayerNorm's output should be one" "but it's (%d) outputs.", nb_outputs)); + PADDLE_ENFORCE_EQ(nb_outputs, 1, + platform::errors::InvalidArgument( + "The EmbEltwiseLayerNorm's output should be one" + "but it's (%d) outputs.", + nb_outputs)); PADDLE_ENFORCE_LT( pos, nb_inputs + nb_outputs, platform::errors::InvalidArgument("The pos(%d) should be less than the " @@ -122,7 +231,7 @@ bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination( } if (pos == all_nums - 1) { - if (sizeof(T) == sizeof(float)) { + if (with_fp16_ == false) { return desc.type == nvinfer1::DataType::kFLOAT; } else { return desc.type == nvinfer1::DataType::kHALF; @@ -131,84 +240,27 @@ bool EmbEltwiseLayernormPluginDynamic::supportsFormatCombination( return false; } -template -nvinfer1::DataType EmbEltwiseLayernormPluginDynamic::getOutputDataType( +nvinfer1::DataType EmbEltwiseLayernormPluginDynamic::getOutputDataType( int index, const nvinfer1::DataType *input_types, int nb_inputs) const { PADDLE_ENFORCE_EQ( index, 0, platform::errors::InvalidArgument( "The EmbEltwiseLayernorm Plugin only has one input, so the " "index value should be 0, but get %d.", index)); - return nvinfer1::DataType::kFLOAT; + if (with_fp16_) + return nvinfer1::DataType::kHALF; + else + return nvinfer1::DataType::kFLOAT; } -template -int EmbEltwiseLayernormPluginDynamic::enqueue( +int EmbEltwiseLayernormPluginDynamic::enqueue( const nvinfer1::PluginTensorDesc *input_desc, const nvinfer1::PluginTensorDesc *output_desc, const void *const *inputs, void *const *outputs, void *workspace, cudaStream_t stream) { - auto id_dims = input_desc[0].dims; - int batch = id_dims.d[0]; - int seq_len = id_dims.d[1]; - int input_num = embs_.size(); - - framework::Tensor in_ptr_tensor, emb_ptr_tensor; - int device_id; - cudaGetDevice(&device_id); - - in_ptr_tensor.Resize({input_num}); - emb_ptr_tensor.Resize({input_num}); - int64_t *in_ptr_gpu_d = - in_ptr_tensor.mutable_data(platform::CUDAPlace(device_id)); - int64_t *emb_ptr_gpu_d = - emb_ptr_tensor.mutable_data(platform::CUDAPlace(device_id)); - - std::vector in_ptr, emb_ptr; - for (int i = 0; i < input_num; i++) { - in_ptr.push_back(reinterpret_cast(inputs[i])); - emb_ptr.push_back(reinterpret_cast(embs_gpu_[i])); - } - - cudaMemcpyAsync(in_ptr_gpu_d, in_ptr.data(), sizeof(int64_t) * input_num, - cudaMemcpyHostToDevice, stream); - cudaMemcpyAsync(emb_ptr_gpu_d, emb_ptr.data(), sizeof(int64_t) * input_num, - cudaMemcpyHostToDevice, stream); - - auto out_type = output_desc[0].type; - - const unsigned tpb = 256; - const dim3 grid(seq_len, batch, 1); - const dim3 block(tpb, 1, 1); - if (sizeof(T) == sizeof(float)) { - PADDLE_ENFORCE_EQ( - out_type == nvinfer1::DataType::kFLOAT, true, - platform::errors::InvalidArgument( - "The EmbEltwiseLayernorm Plugin only support fp32 input.")); - } else if (sizeof(T) == sizeof(int16_t)) { - PADDLE_ENFORCE_EQ( - out_type == nvinfer1::DataType::kHALF, true, - platform::errors::InvalidArgument( - "The EmbEltwiseLayernorm Plugin only support fp16 input.")); - } else { - PADDLE_THROW(platform::errors::Fatal( - "Unsupport data type, the out type of EmbEltwiseLayernorm should be " - "float or half.")); - } - - T *output_d = static_cast(outputs[0]); - - operators::math::EmbEltwiseLayerNormFunctor emb_eltwise_layernorm_func; - emb_eltwise_layernorm_func(batch, seq_len, hidden_size_, in_ptr_gpu_d, - scale_gpu_, bias_gpu_, emb_ptr_gpu_d, output_d, - eps_, input_num, stream); + impl_->enqueue(input_desc, output_desc, inputs, outputs, workspace, stream); return cudaGetLastError() != cudaSuccess; } -template class EmbEltwiseLayernormPluginDynamic; -#ifdef SUPPORTS_CUDA_FP16 -template class EmbEltwiseLayernormPluginDynamic; -#endif // SUPPORTS_CUDA_FP16 - #endif } // namespace plugin diff --git a/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.h b/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.h index 5babd87db0602e973452efa613fcaf9810d29afa..24ca853104e35c26a2f9add57fd2f8bc025646c2 100644 --- a/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.h +++ b/paddle/fluid/inference/tensorrt/plugin/emb_eltwise_layernorm_plugin.h @@ -27,14 +27,76 @@ namespace tensorrt { namespace plugin { #if IS_TRT_VERSION_GE(6000) + +class EmbEltwiseLayernormPluginDynamicImplBase { + public: + EmbEltwiseLayernormPluginDynamicImplBase() {} + virtual ~EmbEltwiseLayernormPluginDynamicImplBase() {} + + virtual int initialize() = 0; + virtual void terminate() = 0; + virtual int enqueue(const nvinfer1::PluginTensorDesc* inputDesc, + const nvinfer1::PluginTensorDesc* outputDesc, + const void* const* inputs, void* const* outputs, + void* workspace, cudaStream_t stream) = 0; +}; + template +class EmbEltwiseLayernormPluginDynamicImpl + : public EmbEltwiseLayernormPluginDynamicImplBase { + public: + explicit EmbEltwiseLayernormPluginDynamicImpl(std::vector input_embs, + float* bias, float* scale, + std::vector emb_sizes, + int bias_size, int scale_size, + int hidden_size, float eps) + : embs_(input_embs), + bias_(bias), + scale_(scale), + emb_sizes_(emb_sizes), + bias_size_(bias_size), + scale_size_(scale_size), + hidden_size_(hidden_size), + eps_(eps) {} + + ~EmbEltwiseLayernormPluginDynamicImpl(); + + int initialize(); + void terminate(); + int enqueue(const nvinfer1::PluginTensorDesc* inputDesc, + const nvinfer1::PluginTensorDesc* outputDesc, + const void* const* inputs, void* const* outputs, void* workspace, + cudaStream_t stream); + + private: + std::vector embs_; + float* bias_{nullptr}; + float* scale_{nullptr}; + + // data on devices + float* bias_gpu_{nullptr}; + float* scale_gpu_{nullptr}; + std::vector embs_gpu_; + + std::vector emb_sizes_; + int bias_size_; + int scale_size_; + int hidden_size_; + float eps_; + + framework::Tensor in_ptr_tensor_, emb_ptr_tensor_; + int device_id_{0}; + uintptr_t old_input_ptr_{0}; +}; + class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { public: explicit EmbEltwiseLayernormPluginDynamic(std::vector input_embs, float* bias, float* scale, std::vector emb_sizes, int bias_size, int scale_size, - int hidden_size, float eps) + int hidden_size, float eps, + bool with_fp16) : embs_(input_embs), bias_(bias), scale_(scale), @@ -42,51 +104,81 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { bias_size_(bias_size), scale_size_(scale_size), hidden_size_(hidden_size), - eps_(eps) {} + eps_(eps), + with_fp16_(with_fp16), + own_host_buff_(false) { + if (with_fp16) { +#ifdef SUPPORTS_CUDA_FP16 + impl_ = new EmbEltwiseLayernormPluginDynamicImpl( + embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, + hidden_size_, eps_); +#else + PADDLE_THROW(platform::errors::Fatal( + "Unsupported data type, current GPU doesn't support half.")); +#endif // SUPPORTS_CUDA_FP16 + } else { + impl_ = new EmbEltwiseLayernormPluginDynamicImpl( + embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, + hidden_size_, eps_); + } + } EmbEltwiseLayernormPluginDynamic(void const* serial_data, - size_t serial_length) { + size_t serial_length) + : own_host_buff_(true) { DeserializeValue(&serial_data, &serial_length, &emb_sizes_); - embs_gpu_.resize(emb_sizes_.size()); embs_.resize(emb_sizes_.size()); for (size_t i = 0; i < emb_sizes_.size(); i++) { - cudaMalloc(&embs_gpu_[i], sizeof(float) * emb_sizes_[i]); - cudaMemcpy(embs_gpu_[i], serial_data, emb_sizes_[i] * sizeof(float), - cudaMemcpyHostToDevice); + auto size = emb_sizes_[i]; + auto ptr = new float[size]; + memcpy(ptr, serial_data, sizeof(float) * size); + embs_[i] = ptr; reinterpret_cast(serial_data) += emb_sizes_[i] * sizeof(float); serial_length -= emb_sizes_[i] * sizeof(float); - embs_[i] = nullptr; } DeserializeValue(&serial_data, &serial_length, &bias_size_); DeserializeValue(&serial_data, &serial_length, &scale_size_); - cudaMalloc(&bias_gpu_, sizeof(float) * bias_size_); - cudaMemcpy(bias_gpu_, serial_data, bias_size_ * sizeof(float), - cudaMemcpyHostToDevice); - bias_ = nullptr; + if (bias_size_) { + bias_ = new float[bias_size_]; + memcpy(bias_, serial_data, sizeof(float) * bias_size_); + } reinterpret_cast(serial_data) += bias_size_ * sizeof(float); serial_length -= bias_size_ * sizeof(float); - cudaMalloc(&scale_gpu_, sizeof(float) * scale_size_); - cudaMemcpy(scale_gpu_, serial_data, scale_size_ * sizeof(float), - cudaMemcpyHostToDevice); - scale_ = nullptr; + if (scale_size_) { + scale_ = new float[scale_size_]; + memcpy(scale_, serial_data, sizeof(float) * scale_size_); + } reinterpret_cast(serial_data) += scale_size_ * sizeof(float); serial_length -= scale_size_ * sizeof(float); DeserializeValue(&serial_data, &serial_length, &hidden_size_); DeserializeValue(&serial_data, &serial_length, &eps_); + DeserializeValue(&serial_data, &serial_length, &with_fp16_); + + if (with_fp16_) { +#ifdef SUPPORTS_CUDA_FP16 + impl_ = new EmbEltwiseLayernormPluginDynamicImpl( + embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, + hidden_size_, eps_); +#else + PADDLE_THROW(platform::errors::Fatal( + "Unsupported data type, current GPU doesn't support half.")); +#endif // SUPPORTS_CUDA_FP16 + } else { + impl_ = new EmbEltwiseLayernormPluginDynamicImpl( + embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, + hidden_size_, eps_); + } } nvinfer1::IPluginV2DynamicExt* clone() const override { auto ptr = new EmbEltwiseLayernormPluginDynamic( embs_, bias_, scale_, emb_sizes_, bias_size_, scale_size_, hidden_size_, - eps_); - ptr->embs_gpu_ = embs_gpu_; - ptr->bias_gpu_ = bias_gpu_; - ptr->scale_gpu_ = scale_gpu_; + eps_, with_fp16_); return ptr; } @@ -95,6 +187,7 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { } int getNbOutputs() const override { return 1; } int initialize() override; + void terminate() override; size_t getSerializationSize() const override { int sum_num = 0; @@ -110,24 +203,32 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { sum_num += (bias_size_ + scale_size_) * sizeof(float); sum_num += SerializedSize(hidden_size_); sum_num += SerializedSize(eps_); - // sum_num += SerializedSize(with_fp16_); + sum_num += SerializedSize(with_fp16_); return sum_num; } - void terminate() override; void serialize(void* buffer) const override { - // SerializeValue(&buffer, with_fp16_); SerializeValue(&buffer, emb_sizes_); for (size_t i = 0; i < emb_sizes_.size(); i++) { - SerializeCudaPointer(&buffer, embs_gpu_[i], emb_sizes_[i]); + auto size = emb_sizes_[i]; + for (int j = 0; j < size; ++j) { + SerializeValue(&buffer, embs_[i][j]); + } } SerializeValue(&buffer, bias_size_); SerializeValue(&buffer, scale_size_); - SerializeCudaPointer(&buffer, bias_gpu_, bias_size_); - SerializeCudaPointer(&buffer, scale_gpu_, scale_size_); + for (int i = 0; i < bias_size_; ++i) { + SerializeValue(&buffer, bias_[i]); + } + + for (int i = 0; i < scale_size_; ++i) { + SerializeValue(&buffer, scale_[i]); + } + SerializeValue(&buffer, hidden_size_); SerializeValue(&buffer, eps_); + SerializeValue(&buffer, with_fp16_); } nvinfer1::DimsExprs getOutputDimensions( @@ -158,23 +259,33 @@ class EmbEltwiseLayernormPluginDynamic : public DynamicPluginTensorRT { const nvinfer1::DataType* input_types, int nb_inputs) const override; - void destroy() override { delete this; } + void destroy() override { + if (own_host_buff_) { + for (auto ptr : embs_) { + delete[] ptr; + } + delete[] bias_; + delete[] scale_; + } + + delete impl_; + delete this; + } private: std::vector embs_; float* bias_; float* scale_; - // data on devices - float* bias_gpu_; - float* scale_gpu_; - std::vector embs_gpu_; - std::vector emb_sizes_; int bias_size_; int scale_size_; int hidden_size_; float eps_; + + bool with_fp16_; + bool own_host_buff_{false}; + EmbEltwiseLayernormPluginDynamicImplBase* impl_{nullptr}; }; class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator { @@ -198,8 +309,7 @@ class EmbEltwiseLayernormPluginV2Creator : public nvinfer1::IPluginCreator { nvinfer1::IPluginV2* deserializePlugin(const char* name, const void* serial_data, size_t serial_length) override { - return new EmbEltwiseLayernormPluginDynamic(serial_data, - serial_length); + return new EmbEltwiseLayernormPluginDynamic(serial_data, serial_length); } void setPluginNamespace(const char* lib_namespace) override { diff --git a/paddle/fluid/inference/tests/api/trt_dynamic_shape_ernie_deserialize_test.cc b/paddle/fluid/inference/tests/api/trt_dynamic_shape_ernie_deserialize_test.cc index 685f7b6600e4d73731860135469a3072d8ce7f9a..d49f83b9d38a3d16099c4bc698c47f18a4280da0 100644 --- a/paddle/fluid/inference/tests/api/trt_dynamic_shape_ernie_deserialize_test.cc +++ b/paddle/fluid/inference/tests/api/trt_dynamic_shape_ernie_deserialize_test.cc @@ -151,7 +151,7 @@ void trt_ernie(bool with_fp16, std::vector result) { run(config, &out_data); // serialize run(*config_deser, &out_data); // deserialize for (size_t i = 0; i < out_data.size(); i++) { - EXPECT_NEAR(result[i], out_data[i], 1e-6); + EXPECT_NEAR(result[i], out_data[i], 1e-2); } } @@ -159,13 +159,11 @@ TEST(AnalysisPredictor, no_fp16) { std::vector result = {0.597841, 0.219972, 0.182187}; trt_ernie(false, result); } - -TEST(AnalysisPredictor, fp16) { #ifdef SUPPORTS_CUDA_FP16 - std::vector result = {0.598336, 0.219558, 0.182106}; +TEST(AnalysisPredictor, fp16) { + std::vector result = {0.59923654, 0.21923761, 0.18152587}; trt_ernie(true, result); -#endif } - +#endif // SUPPORTS_CUDA_FP16 } // namespace inference } // namespace paddle diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index 3de577d847dd027c4778ccee760d5298501bad80..ac89116fc499d456e1fab8db030eda1c8fce9de2 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -121,6 +121,18 @@ function cmake_base() { else exit 1 fi + elif [ "$1" == "cp38-cp38" ]; then + if [ -d "/Library/Frameworks/Python.framework/Versions/3.8" ]; then + export LD_LIBRARY_PATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/ + export DYLD_LIBRARY_PATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/ + export PATH=/Library/Frameworks/Python.framework/Versions/3.8/bin/:${PATH} + PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/Library/Frameworks/Python.framework/Versions/3.8/bin/python3 + -DPYTHON_INCLUDE_DIR:PATH=/Library/Frameworks/Python.framework/Versions/3.8/include/python3.8/ + -DPYTHON_LIBRARY:FILEPATH=/Library/Frameworks/Python.framework/Versions/3.8/lib/libpython3.8.dylib" + pip3.8 install --user -r ${PADDLE_ROOT}/python/requirements.txt + else + exit 1 + fi fi # delete `gym` to avoid modifying requirements.txt in *.whl sed -i .bak "/^gym$/d" ${PADDLE_ROOT}/python/requirements.txt @@ -176,6 +188,13 @@ function cmake_base() { -DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.7.0/include/python3.7m -DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.7.0/lib/libpython3.so" pip3.7 install -r ${PADDLE_ROOT}/python/requirements.txt + elif [ "$1" == "cp38-cp38" ]; then + export LD_LIBRARY_PATH=/opt/_internal/cpython-3.8.0/lib/:${LD_LIBRARY_PATH} + export PATH=/opt/_internal/cpython-3.8.0/bin/:${PATH} + export PYTHON_FLAGS="-DPYTHON_EXECUTABLE:FILEPATH=/opt/_internal/cpython-3.8.0/bin/python3.8 + -DPYTHON_INCLUDE_DIR:PATH=/opt/_internal/cpython-3.8.0/include/python3.8 + -DPYTHON_LIBRARIES:FILEPATH=/opt/_internal/cpython-3.8.0/lib/libpython3.so" + pip3.8 install -r ${PADDLE_ROOT}/python/requirements.txt fi else pip install -r ${PADDLE_ROOT}/python/requirements.txt @@ -514,6 +533,8 @@ EOF pip3.6 uninstall -y paddlepaddle elif [ "$1" == "cp37-cp37m" ]; then pip3.7 uninstall -y paddlepaddle + elif [ "$1" == "cp38-cp38" ]; then + pip3.8 uninstall -y paddlepaddle fi set -ex @@ -527,6 +548,8 @@ EOF pip3.6 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl elif [ "$1" == "cp37-cp37m" ]; then pip3.7 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl + elif [ "$1" == "cp38-cp38" ]; then + pip3.8 install --user ${INSTALL_PREFIX:-/paddle/build}/opt/paddle/share/wheels/*.whl fi tmpfile_rand=`date +%s%N` tmpfile=$tmp_dir/$tmpfile_rand @@ -666,7 +689,7 @@ function generate_api_spec() { awk -F '(' '{print $NF}' $spec_path >${spec_path}.doc awk -F '(' '{$NF="";print $0}' $spec_path >${spec_path}.api - if [ "$1" == "cp35-cp35m" ] || [ "$1" == "cp36-cp36m" ] || [ "$1" == "cp37-cp37m" ]; then + if [ "$1" == "cp35-cp35m" ] || [ "$1" == "cp36-cp36m" ] || [ "$1" == "cp37-cp37m" ] || [ "$1" == "cp38-cp38" ]; then # Use sed to make python2 and python3 sepc keeps the same sed -i 's/arg0: str/arg0: unicode/g' $spec_path sed -i "s/\(.*Transpiler.*\).__init__ (ArgSpec(args=\['self'].*/\1.__init__ /g" $spec_path @@ -1244,21 +1267,25 @@ EOF ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl + ref_paddle38=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp38-cp38-linux_x86_64.whl ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp27-cp27mu-linux_x86_64.whl ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp35-cp35m-linux_x86_64.whl ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp36-cp36m-linux_x86_64.whl ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp37-cp37m-linux_x86_64.whl + ref_paddle38_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}-cp38-cp38-linux_x86_64.whl if [[ ${PADDLE_BRANCH} != "0.0.0" && ${WITH_MKL} == "ON" && ${WITH_GPU} == "ON" ]]; then ref_paddle2=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl ref_paddle35=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl ref_paddle36=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl ref_paddle37=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl + ref_paddle38=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp38-cp38-linux_x86_64.whl ref_paddle2_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp27-cp27mu-linux_x86_64.whl ref_paddle35_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp35-cp35m-linux_x86_64.whl ref_paddle36_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp36-cp36m-linux_x86_64.whl ref_paddle37_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp37-cp37m-linux_x86_64.whl + ref_paddle38_whl=paddlepaddle${install_gpu}-${PADDLE_BRANCH}.post${ref_CUDA_MAJOR}${CUDNN_MAJOR}-cp38-cp38-linux_x86_64.whl fi #ref_paddle2_mv1="" @@ -1363,6 +1390,22 @@ EOF apt-get clean -y && \ rm -f ${ref_paddle37} && \ ldconfig +EOF + cat >> ${PADDLE_ROOT}/build/Dockerfile < /dev/null && \ + make -j8 > /dev/null && make altinstall > /dev/null && cd ../ && rm Python-3.8.0.tgz + RUN apt-get install -y libgtk2.0-dev dmidecode python3-tk && ldconfig && \ + pip3.8 install opencv-python && wget ${ref_web}/${ref_paddle38} && pip3.8 install ${ref_paddle38_whl}; apt-get install -f -y && \ + apt-get clean -y && \ + rm -f ${ref_paddle38} && \ + ldconfig EOF cat >> ${PADDLE_ROOT}/build/Dockerfile < 0 and len(gateway[0]) > 1: + return gateway[0][1] + return "lo" + + def __get_default_iface_from_interfaces(self): + """ + get default physical interface + """ + import netifaces + for intf_name in netifaces.interfaces(): + addresses = netifaces.ifaddresses(intf_name) + if netifaces.AF_INET in addresses: + ipv4_addresses = addresses[netifaces.AF_INET] + for ipv4_address in ipv4_addresses: + if 'broadcast' in ipv4_address: + return intf_name + return "lo" + + def barrier(self, comm_world): + """ + dummy barrier, do nothing + """ + if not self._is_initialized: + warnings.warn(self._err_init) + return + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + if comm_world == "worker": + self._worker_comm.barrier() + elif comm_world == "server": + self._server_comm.barrier() + else: + self._nodes_comm.barrier() + + def all_reduce(self, input, mode="sum", comm_world="worker"): + if not self._is_initialized: + warnings.warn(self._err_init) + return input + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + input = np.array(input) + input_shape = input.shape + input_list = input.reshape(-1).tolist() + + self.barrier(comm_world) + + if comm_world == "worker": + ans = self._worker_comm.all_reduce(input_list, mode) + elif comm_world == "server": + ans = self._server_comm.all_reduce(input_list, mode) + else: + ans = self._nodes_comm.all_reduce(input_list, mode) + + output = np.array(ans).reshape(input_shape) + return output + + def all_gather(self, input, comm_world="worker"): + """ + dummy all gather, do nothing + Args: + obj(any): obj to do all gather + """ + if not self._is_initialized: + warnings.warn(self._err_init) + return input + + if comm_world not in self._comm_world: + raise ValueError(self._err_world) + + if comm_world == "worker": + output = self._worker_comm.all_gather(input) + elif comm_world == "server": + output = self._server_comm.all_gather(input) + else: + output = self._nodes_comm.all_gather(input) + + return output class RoleMakerBase(object): @@ -47,10 +361,6 @@ class RoleMakerBase(object): self._heter_trainer_device = "CPU" self._is_heter_parameter_server_mode = False - self._node_type = None - self._node_type_comm = None - self._all_comm = None - def is_worker(self): """ return is_worker() of current process @@ -142,19 +452,11 @@ class RoleMakerBase(object): self._role, self._current_id, self._worker_endpoints, self._server_endpoints) - def _all_gather(self, comm_world, input): - """ - - Args: - input(int|float): input value - - Returns: - return a list of values - """ - print("warning: RoleMakerBase does not have all gather.") + def _all_gather(self, input, comm_world="worker"): + print("warning: RoleMakerBase does not have all gather worker.") return None - def _all_reduce(self, comm_world, input, mode="sum"): + def _all_reduce(self, input, mode="sum", comm_world="worker"): """ Args: input(list/numpy.array): array of one dim @@ -221,73 +523,25 @@ class PaddleCloudRoleMaker(RoleMakerBase): def __init__(self, is_collective=False, **kwargs): super(PaddleCloudRoleMaker, self).__init__() self._is_collective = is_collective - self._init_gloo = False # default no init gloo - self._kwargs = kwargs + self._non_distributed = False + + self._kwargs = kwargs self._role_is_generated = False self._server_endpoints = None self._worker_endpoints = None - self._node_type_comm = None - self._all_comm = None - - self._non_distributed = False - - if not self._is_collective: - self._hdfs_name = kwargs.get("hdfs_name", "") - self._hdfs_ugi = kwargs.get("hdfs_ugi", "") - self._hdfs_path = kwargs.get("path", "").rstrip("/") - self._init_timeout_seconds = kwargs.get("init_timeout_seconds", - 3600) - self._run_timeout_seconds = kwargs.get("run_timeout_seconds", - 9999999) - ip_port = kwargs.get("http_ip_port", "") - self._http_ip_port = [] - self._http_server = None - # if ip_port is not empty, it will use http instead of hdfs - if ip_port != "": - self._http_ip_port = ip_port.split(":") - # it's for communication between processes - self._manager = Manager() - # global dict to store status - self._http_server_d = self._manager.dict() - # set running status of http server - self._http_server_d["running"] = False - self._iface = self.__get_default_iface() - # this environment variable can be empty - self._prefix = os.getenv("SYS_JOB_ID", "") + self._gloo = Gloo() # gloo instance def _barrier(self, comm_world): - if isinstance(comm_world, fluid.core.Gloo): - comm_world.barrier() - else: - print("warning: must init Gloo before using _barrier() function") - - def _all_gather(self, comm_world, input): - if isinstance(comm_world, fluid.core.Gloo): - self._barrier(comm_world) - output = comm_world.all_gather(input) - return output - else: - print("warning: must init Gloo before using _all_gather() function") - return None - - def _all_reduce(self, comm_world, input, mode="sum"): - if isinstance(comm_world, fluid.core.Gloo): - - input = np.array(input) + self._gloo.barrier(comm_world) - input_shape = input.shape - input_list = input.reshape(-1).tolist() + def _all_gather(self, input, comm_world="worker"): + return self._gloo.all_gather(input, comm_world) - self._barrier(comm_world) - ans = comm_world.all_reduce(input_list, mode) - output = np.array(ans).reshape(input_shape) - return output - else: - print("warning: must init Gloo before using _all_reduce() function") - return None + def _all_reduce(self, input, mode="sum", comm_world="worker"): + return self._gloo.all_reduce(input, mode, comm_world) def is_worker(self): """ @@ -349,7 +603,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): """ if not self._role_is_generated: self.generate_role() - return self._trainers_num + return len(self.get_pserver_endpoints()) def node_num(self): """ @@ -421,8 +675,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): # Environment variable PADDLE_PSERVERS_IP_PORT_LIST must be set # format: string(ip:port,ip:port), eg. 127.0.0.1:6001,127.0.0.1:6002 self._server_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST") - self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", - "").split(",") + if self._server_endpoints is None: # back to non_distributed execution. self._server_endpoints = "" @@ -436,6 +689,13 @@ class PaddleCloudRoleMaker(RoleMakerBase): return self._server_endpoints = self._server_endpoints.split(",") + + self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") + if self._worker_endpoints: + self._worker_endpoints = self._worker_endpoints.split(",") + else: + self._worker_endpoints = [] + trainers_num = int(os.environ["PADDLE_TRAINERS_NUM"]) training_role = os.environ["TRAINING_ROLE"] @@ -506,6 +766,7 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._current_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) self._training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER") assert (self._training_role == "TRAINER") + self._role = Role.WORKER self._worker_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") self._cur_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") if self._worker_endpoints is None: @@ -518,74 +779,64 @@ class PaddleCloudRoleMaker(RoleMakerBase): self._node_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) - def _init_gloo_env(self): - def init_gloo_instance(role="trainer"): - role = role.lower() - assert role in ["trainer", "pserver", "all"] - if role == "trainer": - all_list = self._worker_endpoints - rank = self._current_id - elif role == "pserver": - all_list = self._server_endpoints - rank = self._current_id - else: - all_list = self._worker_endpoints + self._server_endpoints - rank = all_list.index(self._cur_endpoint) - gloo = fluid.core.Gloo() - gloo.set_rank(rank) - gloo.set_size(len(all_list)) - gloo.set_prefix(self._prefix) - gloo.set_iface(self._iface) - gloo.set_timeout_seconds(self._init_timeout_seconds, - self._run_timeout_seconds) - if len(self._http_ip_port) != 0: - gloo.set_http_store(self._http_ip_port[0], - int(self._http_ip_port[1]), role) - else: - gloo.set_hdfs_store(self._hdfs_path + "/" + role, - self._hdfs_name, self._hdfs_ugi) - gloo.init() - return gloo - - # paddlecloud support gloo - if self._role == Role.WORKER: - if self._current_id == 0 and len(self._http_ip_port) != 0: - size_d = { - "trainer": len(self._worker_endpoints), - "pserver": len(self._server_endpoints), - "all": - len(self._worker_endpoints) + len(self._server_endpoints) - } - # child process for http server - self._http_server = Process( - target=self.__start_kv_server, - args=(self._http_server_d, size_d)) - self._http_server.daemon = True - # set running status to True - self._http_server_d["running"] = True - # start child process - self._http_server.start() - self._node_type = 1 - gloo = init_gloo_instance("trainer") - self._node_type_comm = gloo + def _gloo_init(self): + # PADDLE_WITH_GLOO 1: trainer barrier, 2: all barrier + use_gloo = int(os.getenv("PADDLE_WITH_GLOO", "0")) + if use_gloo not in [1, 2]: + return + + # PADDLE_GLOO_RENDEZVOUS 1: HDFS 2: FILE 3: HTTP + rendezvous_type = int(os.getenv("PADDLE_GLOO_RENDEZVOUS", "0")) + prefix = os.getenv("SYS_JOB_ID", "") + if rendezvous_type not in [ + Gloo.RENDEZVOUS.HDFS, Gloo.RENDEZVOUS.HTTP, Gloo.RENDEZVOUS.FILE + ]: + raise ValueError(self._gloo._err_type) + + need_init_all = True if use_gloo == 2 else False + + if rendezvous_type == Gloo.RENDEZVOUS.HDFS: + dfs_name = os.getenv("PADDLE_GLOO_FS_NAME", "") + dfs_ugi = os.getenv("PADDLE_GLOO_FS_UGI", "") + dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") + kwargs = { + "dfs.name": dfs_name, + "dfs.ugi": dfs_ugi, + "dfs.path": dfs_path, + "store.prefix": prefix, + } + elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: + ip = os.getenv("PADDLE_GLOO_HTTP_HOST", "") + port = os.getenv("PADDLE_GLOO_HTTP_PORT", "") + kwargs = { + "http.host": ip, + "http.port": port, + "store.prefix": prefix, + } else: - assert self._role == Role.SERVER - self._node_type = 0 - gloo = init_gloo_instance("pserver") - self._node_type_comm = gloo - - all_list = self._worker_endpoints + self._server_endpoints - self._rank = all_list.index(self._cur_endpoint) - self._size = len(all_list) - - gloo = init_gloo_instance("all") - self._all_comm = gloo - - if self._http_server is not None: - # set running status to False - self._http_server_d["running"] = False - # wait until child process exits - self._http_server.join() + dfs_path = os.getenv("PADDLE_GLOO_FS_PATH", "") + kwargs = { + "dfs.path": dfs_path, + "store.prefix": prefix, + } + + if rendezvous_type == Gloo.RENDEZVOUS.HDFS: + type = "HDFS" + elif rendezvous_type == Gloo.RENDEZVOUS.HTTP: + type = "HTTP" + else: + type = "FILE" + print("Gloo init with {}: need_init_all: {}, args: {}".format( + type, need_init_all, kwargs)) + + self._gloo.init( + rendezvous=rendezvous_type, + role=self._role, + role_id=self.role_id(), + worker_num=self.worker_num(), + server_num=self.server_num(), + need_init_all=need_init_all, + kwargs=kwargs) def generate_role(self): """ @@ -594,57 +845,10 @@ class PaddleCloudRoleMaker(RoleMakerBase): if not self._role_is_generated: if not self._is_collective: self._ps_env() - if "PADDLE_WITH_GLOO" in os.environ: - self._init_gloo = bool(os.environ["PADDLE_WITH_GLOO"]) - if self._init_gloo: - self._init_gloo_env() else: self._collective_env() self._role_is_generated = True - - def __get_default_iface(self): - """ - get default physical interface - """ - default1 = self.__get_default_iface_from_gateway() - default2 = self.__get_default_iface_from_interfaces() - return default2 if default1 == "lo" else default1 - - def __get_default_iface_from_gateway(self): - """ - get default physical interface - """ - import netifaces - gateways = netifaces.gateways() - if gateways.get(netifaces.AF_INET) != None: - gateway = gateways[netifaces.AF_INET] - if len(gateway) > 0 and len(gateway[0]) > 1: - return gateway[0][1] - return "lo" - - def __get_default_iface_from_interfaces(self): - """ - get default physical interface - """ - import netifaces - for intf_name in netifaces.interfaces(): - addresses = netifaces.ifaddresses(intf_name) - if netifaces.AF_INET in addresses: - ipv4_addresses = addresses[netifaces.AF_INET] - for ipv4_address in ipv4_addresses: - if 'broadcast' in ipv4_address: - return intf_name - return "lo" - - def __start_kv_server(self, http_server_d, size_d): - from paddle.distributed.fleet.utils.http_server import KVServer - http_server = KVServer(int(self._http_ip_port[1]), size_d) - http_server.start() - wait_seconds = 5 - while http_server_d.get("running", - False) and not http_server.shoud_stop(): - time.sleep(wait_seconds) - http_server.stop() + self._gloo_init() class UserDefinedRoleMaker(PaddleCloudRoleMaker): @@ -677,7 +881,7 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): self._worker_endpoints = self._kwargs.get("worker_endpoints") self._current_id = self._kwargs.get("current_id") self._trainers_num = len(self._worker_endpoints) - self._training_role = Role.Worker + self._training_role = Role.WORKER self._node_num = len( set([x.split(':')[0] for x in self._worker_endpoints])) @@ -688,8 +892,6 @@ class UserDefinedRoleMaker(PaddleCloudRoleMaker): if not self._role_is_generated: if not self._is_collective: self._user_defined_ps_env() - if self._init_gloo: - self._init_gloo_env() else: self._user_defined_collective_env() self._role_is_generated = True diff --git a/python/paddle/distributed/fleet/base/util_factory.py b/python/paddle/distributed/fleet/base/util_factory.py index 4fa247c319616d0a2f1ffbc2d26753dbd278f12f..e822c3c92f47396388079dda649d299872cfc96d 100644 --- a/python/paddle/distributed/fleet/base/util_factory.py +++ b/python/paddle/distributed/fleet/base/util_factory.py @@ -57,34 +57,7 @@ class UtilBase(object): ), "fs_client must be the instance of paddle.distributed.fleet.utils.FS" self.fs_client = fs_client - def __check_comm_world(self, comm_world="worker"): - if not self.role_maker._role_is_generated: - self.role_maker.generate_role() - - _comm_world = None - comm_world_upper = comm_world.upper() - if comm_world_upper == "WORKER": - if not self.role_maker.is_worker(): - print( - "warning: current role is not worker in collective_func(comm_world=\"worker\")" - ) - _comm_world = self.role_maker._node_type_comm - elif comm_world_upper == "SERVER": - if not self.role_maker.is_server(): - print( - "warning: current role is not server in collective_func(comm_world=\"server\")" - ) - _comm_world = self.role_maker._node_type_comm - elif comm_world_upper == "ALL": - _comm_world = self.role_maker._all_comm - else: - raise ValueError( - "not support comm_world, please choose one from [worker, server, all]" - ) - - return _comm_world - - def all_reduce(self, input, mode, comm_world="worker"): + def all_reduce(self, input, mode="sum", comm_world="worker"): """ All reduce `input` between specified collection. This is a distributed API. @@ -130,8 +103,7 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - return self.role_maker._all_reduce(_comm_world, input, mode) + return self.role_maker._all_reduce(input, mode, comm_world) def barrier(self, comm_world="worker"): """ @@ -170,8 +142,7 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - self.role_maker._barrier(_comm_world) + self.role_maker._barrier(comm_world) def all_gather(self, input, comm_world="worker"): """ @@ -219,8 +190,8 @@ class UtilBase(object): if __name__ == "__main__": train() """ - _comm_world = self.__check_comm_world(comm_world) - return self.role_maker._all_gather(_comm_world, input) + + return self.role_maker._all_gather(input, comm_world) def _broadcast(self): pass diff --git a/python/paddle/distributed/fleet/launch.py b/python/paddle/distributed/fleet/launch.py index a527393f6024b27095819e256538a39a240ca7cb..4b629bc35ce59da9af0b72a2ab4ee44e587a86f1 100644 --- a/python/paddle/distributed/fleet/launch.py +++ b/python/paddle/distributed/fleet/launch.py @@ -55,7 +55,10 @@ launch a process on each of the given gpu card or cpu machine. """ from __future__ import print_function + +import shutil import sys +import tempfile from sys import version import subprocess import os @@ -213,12 +216,20 @@ def launch_collective(args): cluster, pod = get_cluster_from_args(args, gpus) logger.debug("get cluster from args:{}".format(cluster)) + global_envs = copy.copy(os.environ.copy()) + gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + global_envs["PADDLE_WITH_GLOO"] = "1" + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "2" + global_envs["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + procs = start_local_trainers( cluster, pod, training_script=args.training_script, training_script_args=args.training_script_args, - log_dir=args.log_dir) + log_dir=args.log_dir, + envs=global_envs) while True: alive = watch_local_trainers(procs, cluster.trainers_nranks()) @@ -230,6 +241,9 @@ def launch_collective(args): time.sleep(3) + if os.path.exists(gloo_rendezvous_dir): + shutil.rmtree(gloo_rendezvous_dir) + def launch_ps(args): ports = None @@ -315,6 +329,13 @@ def launch_ps(args): default_env = os.environ.copy() current_env = copy.copy(default_env) + + gloo_rendezvous_dir = tempfile.mkdtemp() + # add gloo env + current_env["PADDLE_WITH_GLOO"] = "1" + current_env["PADDLE_GLOO_RENDEZVOUS"] = "2" + current_env["PADDLE_GLOO_FS_PATH"] = gloo_rendezvous_dir + current_env.pop("http_proxy", None) current_env.pop("https_proxy", None) procs = [] @@ -419,6 +440,9 @@ def launch_ps(args): procs[i].proc.terminate() print("all parameter server are killed", file=sys.stderr) + if os.path.exists(gloo_rendezvous_dir): + shutil.rmtree(gloo_rendezvous_dir) + def launch(): args = _parse_args() diff --git a/python/paddle/distributed/fleet/launch_utils.py b/python/paddle/distributed/fleet/launch_utils.py index b6f4c75a276920f966a6b324a9bea16148bf337c..17d3b96cf4466e560381c20fe265b39cac6697f0 100644 --- a/python/paddle/distributed/fleet/launch_utils.py +++ b/python/paddle/distributed/fleet/launch_utils.py @@ -398,8 +398,14 @@ def start_local_trainers(cluster, pod, training_script, training_script_args, - log_dir=None): - current_env = copy.copy(os.environ.copy()) + log_dir=None, + envs=None): + + if envs is None: + current_env = copy.copy(os.environ.copy()) + else: + current_env = copy.copy(envs) + #paddle broadcast ncclUniqueId use socket, and #proxy maybe make trainers unreachable, so delete them. #if we set them to "", grpc will log error message "bad uri" diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base.py b/python/paddle/fluid/tests/unittests/test_fleet_base.py index 4ced9841ee43e02a3d1e3f292bf97200dec29f5c..3a90b363f2744f421bfab8eb4d55dd2c6e51e7e9 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base.py @@ -27,7 +27,7 @@ class TestFleetBase(unittest.TestCase): os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" os.environ["PADDLE_TRAINERS_NUM"] = "2" os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \ - "127.0.0.1:36001,127.0.0.2:36001" + "127.0.0.1:36001,127.0.0.2:36001" def test_init(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) @@ -88,7 +88,7 @@ class TestFleetBase(unittest.TestCase): def test_util(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - self.assertEqual(fleet.util, None) + self.assertEqual(fleet.util(), None) def test_barrier_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) @@ -99,20 +99,17 @@ class TestFleetBase(unittest.TestCase): def test_init_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - if fleet.is_worker(): - fleet.init_worker() - def test_run_server(self): - role = role_maker.PaddleCloudRoleMaker(is_collective=True) - fleet.init(role) - if fleet.is_worker(): - fleet.run_worker() + with self.assertRaises(ValueError): + if fleet.is_worker(): + fleet.init_worker() def test_stop_worker(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - if fleet.is_worker(): - fleet.stop_worker() + with self.assertRaises(ValueError): + if fleet.is_worker(): + fleet.stop_worker() def test_distributed_optimizer(self): role = role_maker.PaddleCloudRoleMaker(is_collective=True) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py index cf9b3e1e9a1605a714b47d99183511b24c903722..d786fa1eba8901f53ac76a47632f63f6fb6641eb 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_new.py @@ -15,7 +15,11 @@ from __future__ import print_function import os +import platform +import shutil +import tempfile import unittest +import paddle import paddle.distributed.fleet.base.role_maker as role_maker @@ -42,9 +46,9 @@ class TestRoleMakerBase(unittest.TestCase): self.assertTrue(len(pserver_endpoints) == 0) print(role.to_string()) - self.assertTrue(role._all_gather(role._node_type_comm, 1) is None) - self.assertTrue(role._all_reduce(role._node_type_comm, 1) is None) - role._barrier(role._node_type_comm) + self.assertTrue(role._all_gather(1, "worker") is None) + self.assertTrue(role._all_reduce(1, "sum", "worker") is None) + role._barrier("worker") class TestCloudRoleMaker(unittest.TestCase): @@ -72,8 +76,8 @@ class TestCloudRoleMaker(unittest.TestCase): print("warning: no netifaces, skip test_tr_rolemaker") return - ro = role_maker.PaddleCloudRoleMaker( - is_collective=False, init_gloo=False) + ro = role_maker.PaddleCloudRoleMaker(is_collective=False) + self.assertTrue(ro.is_worker()) self.assertFalse(ro.is_server()) self.assertEqual(ro.worker_num(), 2) @@ -108,8 +112,9 @@ class TestCloudRoleMaker(unittest.TestCase): self.assertEqual(ro.server_num(), 2) pserver_endpoints = ro.get_pserver_endpoints() self.assertEqual(pserver_endpoints[0], '127.0.0.1:36001') - self.assertTrue(ro._all_gather(ro._all_comm, 1) is None) - self.assertTrue(ro._all_reduce(ro._all_comm, 1) is None) + + self.assertEqual(ro._all_gather(1, "worker"), 1) + self.assertEqual(ro._all_reduce(1, "sum", "worker"), 1) def test_traing_role(self): """Test training role.""" @@ -142,7 +147,7 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ro = role_maker.UserDefinedRoleMaker( is_collective=False, init_gloo=False, - server_endpoints="127.0.0.1:36001,127.0.0.1:36001", + server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"], role=role_maker.Role.SERVER, current_id=0, worker_num=2) @@ -161,14 +166,274 @@ class TestUserDefinedRoleMaker(unittest.TestCase): ro = role_maker.UserDefinedRoleMaker( is_collective=False, init_gloo=False, - server_endpoints="127.0.0.1:36001,127.0.0.1:36001", + server_endpoints=["127.0.0.1:36001", "127.0.0.1:36001"], role=role_maker.Role.WORKER, current_id=0, worker_num=2) + self.assertIn("127.0.0.1:36001", ro.get_pserver_endpoints()) self.assertTrue(ro.is_worker()) self.assertEqual(ro.role_id(), 0) +class TestGlooWithCloudRoleMaker(unittest.TestCase): + def setUp(self): + os.environ["PADDLE_TRAINERS_NUM"] = "1" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_TRAINER_ID"] = "0" + + def case(self, role, comm_world): + role._barrier(comm_world) + + gather = role._all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = role._all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + def mkdir(self): + tmp = tempfile.mkdtemp() + return tmp + + def clean(self, tmp): + shutil.rmtree(tmp) + + def test_hdfs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "worker") + self.clean(tmp) + + def test_fs_gloo2(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo3(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.clean(tmp) + + def test_fs_gloo4(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "3" + os.environ["PADDLE_GLOO_HTTP_HOST"] = "127.0.0.1" + os.environ["PADDLE_GLOO_HTTP_PORT"] = "30019" + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + import time + time.sleep(3) + + def test_fs_gloo5(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "2" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo6(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + role = role_maker.PaddleCloudRoleMaker() + role.generate_role() + self.case(role, "server") + self.case(role, "all") + self.clean(tmp) + + def test_fs_gloo7(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "1" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "5" + + role = role_maker.PaddleCloudRoleMaker() + self.assertRaises(ValueError, role.generate_role) + + def test_fs_gloo8(self): + plats = platform.platform() + if 'Linux' not in plats: + print("skip gloo UT on MacOS/Win") + return + + tmp = self.mkdir() + + os.environ["TRAINING_ROLE"] = "PSERVER" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001" + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["PADDLE_TRAINERS_NUM"] = "0" + + os.environ["SYS_JOB_ID"] = "gloo_for_cluster" + + os.environ["PADDLE_WITH_GLOO"] = "2" + os.environ["PADDLE_GLOO_RENDEZVOUS"] = "1" + os.environ["PADDLE_GLOO_FS_NAME"] = "NULL" + os.environ["PADDLE_GLOO_FS_UGI"] = "NULL" + os.environ["PADDLE_GLOO_FS_PATH"] = tmp + + def net(): + x = paddle.fluid.layers.data(name='x', shape=[13], dtype='float32') + y_predict = paddle.fluid.layers.fc(input=x, size=1, act=None) + y = paddle.fluid.layers.data(name='y', shape=[1], dtype='float32') + cost = paddle.fluid.layers.square_error_cost( + input=y_predict, label=y) + avg_cost = paddle.fluid.layers.mean(cost) + return avg_cost + + from paddle.distributed import fleet + + role = role_maker.PaddleCloudRoleMaker() + fleet.init(role) + avg_cost = net() + + strategy = paddle.distributed.fleet.DistributedStrategy() + strategy.a_sync = False + + optimizer = paddle.optimizer.SGD(0.01) + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + comm_world = "server" + fleet.util().barrier(comm_world) + + gather = fleet.util().all_gather(1, comm_world) + self.assertEqual(gather[0], 1) + + all_reduce = fleet.util().all_reduce(1, "sum", comm_world) + self.assertEqual(1, all_reduce) + + self.clean(tmp) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_util.py b/python/paddle/fluid/tests/unittests/test_fleet_util.py index d506088fde0291c1aab7204f5b3ba1a1ab19aa3f..1570912e7406f930212eead64305e1e35e1b8ac0 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_util.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_util.py @@ -59,7 +59,7 @@ class TestFleetUtil(unittest.TestCase): import paddle.distributed.fleet.base.role_maker as role_maker role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) - default_util = fleet.util + default_util = fleet.util() self.assertEqual(default_util, None) def test_set_user_defined_util(self): @@ -76,8 +76,8 @@ class TestFleetUtil(unittest.TestCase): role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) my_util = UserDefinedUtil() - fleet.util = my_util - user_id = fleet.util.get_user_id() + fleet.set_util(my_util) + user_id = fleet.util().get_user_id() self.assertEqual(user_id, 10) def test_fs(self): @@ -88,97 +88,6 @@ class TestFleetUtil(unittest.TestCase): self.assertFalse(fs.need_upload_download()) fleet_util._set_file_system(fs) - def test_barrier(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_barrier") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_barrier", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.SERVER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - fleet_util.barrier("worker") - - def test_all_reduce(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_all_reduce") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.WORKER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - output = fleet_util.all_reduce(1, "sum", comm_world="server") - print(output) - - # self.assertEqual(output, 1) - - def test_all_gather(self): - try: - import netifaces - except: - print("warning: no netifaces, skip test_all_gather") - return - - gloo = fluid.core.Gloo() - gloo.set_rank(0) - gloo.set_size(1) - gloo.set_prefix("123") - gloo.set_iface("lo") - gloo.set_hdfs_store("./tmp_test_fleet_reduce", "", "") - gloo.init() - - role = role_maker.UserDefinedRoleMaker( - is_collective=False, - init_gloo=False, - current_id=0, - role=role_maker.Role.SERVER, - worker_endpoints=["127.0.0.1:6003"], - server_endpoints=["127.0.0.1:6001"]) - role._node_type_comm = gloo - role._all_comm = gloo - role._role_is_generated = True - fleet_util._set_role_maker(role) - - output = fleet_util.all_gather(1, comm_world="all") - print(output) - # self.assertTrue(len(output) == 1 and output[0] == 1) - self.assertRaises(Exception, fleet_util.all_gather, 1, "test") - def download_files(self): path = download(self.proto_data_url, self.module_name, self.proto_data_md5) diff --git a/python/paddle/fluid/tests/unittests/test_transformer_api.py b/python/paddle/fluid/tests/unittests/test_transformer_api.py index 5fea9f69a18c83be0f6af05784735ea53d0993d2..bd76edc9d8cadf14c6cf224b7708ff4acd6efef4 100644 --- a/python/paddle/fluid/tests/unittests/test_transformer_api.py +++ b/python/paddle/fluid/tests/unittests/test_transformer_api.py @@ -474,6 +474,141 @@ class TestTransformer(unittest.TestCase): trans_output = transformer(src, tgt, src_mask, tgt_mask, memory_mask) + def test_transformer_attr_1(self): + batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params( + mode="decoder_layer") + + # batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8 + with fluid.dygraph.guard(fluid.CPUPlace()): + transformer = Transformer( + d_model, + n_head, + dim_feedforward=dim_feedforward, + dropout=dropout, + weight_attr=[None], + bias_attr=[False]) + src = paddle.to_variable( + np.random.rand(batch_size, source_length, d_model).astype( + "float32")) + tgt = paddle.to_variable( + np.random.rand(batch_size, target_length, d_model).astype( + "float32")) + src_mask = np.zeros((batch_size, n_head, source_length, + source_length)).astype("float32") + src_mask[0][0][0][0] = -np.inf + src_mask = paddle.to_variable(src_mask) + tgt_mask = np.zeros((batch_size, n_head, target_length, + target_length)).astype("float32") + tgt_mask[0][0][0][0] = -1e9 + memory_mask = np.zeros((batch_size, n_head, target_length, + source_length)).astype("float32") + memory_mask[0][0][0][0] = -1e9 + tgt_mask, memory_mask = paddle.to_variable( + tgt_mask), paddle.to_variable(memory_mask) + trans_output = transformer(src, tgt, src_mask, tgt_mask, + memory_mask) + + def test_transformer_attr_2(self): + batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params( + mode="decoder_layer") + + # batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8 + with fluid.dygraph.guard(fluid.CPUPlace()): + transformer = Transformer( + d_model, + n_head, + dim_feedforward=dim_feedforward, + dropout=dropout, + weight_attr=[None, None], + bias_attr=[False, False]) + src = paddle.to_variable( + np.random.rand(batch_size, source_length, d_model).astype( + "float32")) + tgt = paddle.to_variable( + np.random.rand(batch_size, target_length, d_model).astype( + "float32")) + src_mask = np.zeros((batch_size, n_head, source_length, + source_length)).astype("float32") + src_mask[0][0][0][0] = -np.inf + src_mask = paddle.to_variable(src_mask) + tgt_mask = np.zeros((batch_size, n_head, target_length, + target_length)).astype("float32") + tgt_mask[0][0][0][0] = -1e9 + memory_mask = np.zeros((batch_size, n_head, target_length, + source_length)).astype("float32") + memory_mask[0][0][0][0] = -1e9 + tgt_mask, memory_mask = paddle.to_variable( + tgt_mask), paddle.to_variable(memory_mask) + trans_output = transformer(src, tgt, src_mask, tgt_mask, + memory_mask) + + def test_transformer_attr_3(self): + batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params( + mode="decoder_layer") + + # batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8 + with fluid.dygraph.guard(fluid.CPUPlace()): + transformer = Transformer( + d_model, + n_head, + dim_feedforward=dim_feedforward, + dropout=dropout, + weight_attr=[None, None, None], + bias_attr=[False, False, True]) + src = paddle.to_variable( + np.random.rand(batch_size, source_length, d_model).astype( + "float32")) + tgt = paddle.to_variable( + np.random.rand(batch_size, target_length, d_model).astype( + "float32")) + src_mask = np.zeros((batch_size, n_head, source_length, + source_length)).astype("float32") + src_mask[0][0][0][0] = -np.inf + src_mask = paddle.to_variable(src_mask) + tgt_mask = np.zeros((batch_size, n_head, target_length, + target_length)).astype("float32") + tgt_mask[0][0][0][0] = -1e9 + memory_mask = np.zeros((batch_size, n_head, target_length, + source_length)).astype("float32") + memory_mask[0][0][0][0] = -1e9 + tgt_mask, memory_mask = paddle.to_variable( + tgt_mask), paddle.to_variable(memory_mask) + trans_output = transformer(src, tgt, src_mask, tgt_mask, + memory_mask) + + def test_transformer_attr_boolean(self): + batch_size, d_model, n_head, dim_feedforward, dropout, _, _, source_length, target_length = generate_basic_params( + mode="decoder_layer") + + # batch_size, source_length, target_length, d_model, n_head = 4, 8, 8, 64, 8 + with fluid.dygraph.guard(fluid.CPUPlace()): + transformer = Transformer( + d_model, + n_head, + dim_feedforward=dim_feedforward, + dropout=dropout, + bias_attr=False) + src = paddle.to_variable( + np.random.rand(batch_size, source_length, d_model).astype( + "float32")) + tgt = paddle.to_variable( + np.random.rand(batch_size, target_length, d_model).astype( + "float32")) + src_mask = np.zeros((batch_size, n_head, source_length, + source_length)).astype("float32") + src_mask[0][0][0][0] = -np.inf + src_mask = paddle.to_variable(src_mask) + tgt_mask = np.zeros((batch_size, n_head, target_length, + target_length)).astype("float32") + tgt_mask[0][0][0][0] = -1e9 + memory_mask = np.zeros((batch_size, n_head, target_length, + source_length)).astype("float32") + memory_mask[0][0][0][0] = -1e9 + tgt_mask, memory_mask = paddle.to_variable( + tgt_mask), paddle.to_variable(memory_mask) + trans_output = transformer(src, tgt, src_mask, tgt_mask, + memory_mask) + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/nn/layer/transformer.py b/python/paddle/nn/layer/transformer.py index 63069e83952172df3136458ebfee4b446749934d..4b199d5816c808d4975c51bc154ad21d46f135eb 100644 --- a/python/paddle/nn/layer/transformer.py +++ b/python/paddle/nn/layer/transformer.py @@ -53,7 +53,22 @@ def _convert_param_attr_to_list(param_attr, n): if isinstance(param_attr, (list, tuple)): assert len(param_attr) == n, ( "length of param_attr should be %d when it is a list/tuple" % n) - param_attrs = [ParamAttr._to_attr(attr) for attr in param_attr] + param_attrs = [] + for attr in param_attr: + if isinstance(attr, bool): + if attr: + param_attrs.append(ParamAttr._to_attr(None)) + else: + param_attrs.append(False) + else: + param_attrs.append(ParamAttr._to_attr(attr)) + # param_attrs = [ParamAttr._to_attr(attr) for attr in param_attr] + elif isinstance(param_attr, bool): + param_attrs = [] + if param_attr: + param_attrs = [ParamAttr._to_attr(None) for i in range(n)] + else: + param_attrs = [False] * n else: param_attrs = [] attr = ParamAttr._to_attr(param_attr) @@ -417,7 +432,7 @@ class TransformerEncoderLayer(Layer): Otherwise, MHA and FFN both use it as `weight_attr` to create parameters. Default: None, which means the default weight parameter property is used. See usage for details in :code:`ParamAttr` . - bias_attr (ParamAttr|tuple, optional): To specify the bias parameter property. + bias_attr (ParamAttr|tuple|bool, optional): To specify the bias parameter property. If it is a tuple, `bias_attr[0]` would be used as `bias_attr` for MHA, and `bias_attr[1]` would be used as `bias_attr` for linear in FFN. Otherwise, MHA and FFN both use it as `bias_attr` to create parameters. @@ -986,22 +1001,31 @@ class Transformer(Layer): Otherwise, no pre-process and post-precess includes dropout, residual connection, layer normalization. Default False weight_attr(ParamAttr|tuple, optional): To specify the weight parameter property. - If it is a tuple, `weight_attr[0]` would be used as `weight_attr` for - self attention, `weight_attr[1]` would be used as `weight_attr` for - cross attention, and `weight_attr[2]` would be used as `weight_attr` - for linear in FFN. Otherwise, the three sub-layers all uses it as - `weight_attr` to create parameters. Default: None, which means the - default weight parameter property is used. See usage for details + If it is a tuple, the length of `weight_attr` could be 1, 2 or 3. If it is 3, + `weight_attr[0]` would be used as `weight_attr` for self attention, `weight_attr[1]` + would be used as `weight_attr` for cross attention of `TransformerDecoder`, + and `weight_attr[2]` would be used as `weight_attr` for linear in FFN. + If it is 2, `weight_attr[0]` would be used as `weight_attr` both for self attention + and cross attntion and `weight_attr[1]` would be used as `weight_attr` for + linear in FFN. If it is 1, `weight_attr[0]` would be used as `weight_attr` + for self attention, cross attention and linear in FFN. Otherwise, + the three sub-layers all uses it as `weight_attr` to create parameters. + Default: None, which means the default weight parameter property is used. + See usage for details in :code:`ParamAttr` . bias_attr (ParamAttr|tuple, optional): To specify the bias parameter property. - If it is a tuple, `bias_attr[0]` would be used as `bias_attr` for - self attention, `bias_attr[1]` would be used as `bias_attr` for - cross attention, and `bias_attr[2]` would be used as `bias_attr` - for linear in FFN. Otherwise, the three sub-layers all uses it as - `bias_attr` to create parameters. The `False` value means the - corresponding layer would not have trainable bias parameter. See - usage for details in :code:`ParamAttr` . Default: None,which means - the default bias parameter property is used. + If it is a tuple, the length of `bias_attr` could be 1, 2 or 3. If it is 3, + `bias_attr[0]` would be used as `bias_attr` for self attention, `bias_attr[1]` + would be used as `bias_attr` for cross attention of `TransformerDecoder`, + and `bias_attr[2]` would be used as `bias_attr` for linear in FFN. + If it is 2, `bias_attr[0]` would be used as `bias_attr` both for self attention + and cross attntion and `bias_attr[1]` would be used as `bias_attr` for + linear in FFN. If it is 1, `bias_attr[0]` would be used as `bias_attr` + for self attention, cross attention and linear in FFN. Otherwise, + the three sub-layers all uses it as `bias_attr` to create parameters. + The `False` value means the corresponding layer would not have trainable + bias parameter. See usage for details in :code:`ParamAttr` . + Default: None,which means the default bias parameter property is used. custom_encoder (Layer): If custom encoder is provided, use it as the encoder. Default None custom_decoder (Layer): If custom decoder is provided, use it as the decoder. @@ -1049,13 +1073,51 @@ class Transformer(Layer): custom_decoder=None): super(Transformer, self).__init__() + if isinstance(bias_attr, (list, tuple)): + if len(bias_attr) == 1: + encoder_bias_attr = [bias_attr[0]] * 2 + decoder_bias_attr = [bias_attr[0]] * 3 + elif len(bias_attr) == 2: + encoder_bias_attr = bias_attr + decoder_bias_attr = [bias_attr[0], bias_attr[0], bias_attr[-1]] + elif len(bias_attr) == 3: + encoder_bias_attr = [bias_attr[0], bias_attr[-1]] + decoder_bias_attr = bias_attr + else: + assert False, ( + "length of bias_attr should be 1 or 2 or 3 when it is a list/tuple" + ) + else: + encoder_bias_attr = bias_attr + decoder_bias_attr = bias_attr + + if isinstance(weight_attr, (list, tuple)): + if len(weight_attr) == 1: + encoder_weight_attr = [weight_attr[0]] * 2 + decoder_weight_attr = [weight_attr[0]] * 3 + elif len(weight_attr) == 2: + encoder_weight_attr = weight_attr + decoder_weight_attr = [ + weight_attr[0], weight_attr[0], weight_attr[-1] + ] + elif len(weight_attr) == 3: + encoder_weight_attr = [weight_attr[0], weight_attr[-1]] + decoder_weight_attr = weight_attr + else: + assert False, ( + "length of weight_attr should be 1 or 2 or 3 when it is a list/tuple" + ) + else: + encoder_weight_attr = weight_attr + decoder_weight_attr = weight_attr + if custom_encoder is not None: self.encoder = custom_encoder else: encoder_layer = TransformerEncoderLayer( d_model, nhead, dim_feedforward, dropout, activation, - attn_dropout, act_dropout, normalize_before, weight_attr, - bias_attr) + attn_dropout, act_dropout, normalize_before, + encoder_weight_attr, encoder_bias_attr) encoder_norm = LayerNorm(d_model) self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm) @@ -1065,8 +1127,8 @@ class Transformer(Layer): else: decoder_layer = TransformerDecoderLayer( d_model, nhead, dim_feedforward, dropout, activation, - attn_dropout, act_dropout, normalize_before, weight_attr, - bias_attr) + attn_dropout, act_dropout, normalize_before, + decoder_weight_attr, decoder_bias_attr) decoder_norm = LayerNorm(d_model) self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm)