提交 66406619 编写于 作者: N nhzlx

merge develop

...@@ -8,6 +8,7 @@ set(ANAKIN_INCLUDE "${ANAKIN_INSTALL_DIR}" CACHE STRING "root of Anakin header f ...@@ -8,6 +8,7 @@ set(ANAKIN_INCLUDE "${ANAKIN_INSTALL_DIR}" CACHE STRING "root of Anakin header f
set(ANAKIN_LIBRARY "${ANAKIN_INSTALL_DIR}" CACHE STRING "path of Anakin library") set(ANAKIN_LIBRARY "${ANAKIN_INSTALL_DIR}" CACHE STRING "path of Anakin library")
set(ANAKIN_COMPILE_EXTRA_FLAGS set(ANAKIN_COMPILE_EXTRA_FLAGS
-Wno-error=unused-but-set-variable -Wno-unused-but-set-variable
-Wno-error=unused-variable -Wno-unused-variable -Wno-error=unused-variable -Wno-unused-variable
-Wno-error=format-extra-args -Wno-format-extra-args -Wno-error=format-extra-args -Wno-format-extra-args
-Wno-error=comment -Wno-comment -Wno-error=comment -Wno-comment
...@@ -19,7 +20,7 @@ set(ANAKIN_COMPILE_EXTRA_FLAGS ...@@ -19,7 +20,7 @@ set(ANAKIN_COMPILE_EXTRA_FLAGS
-Wno-reorder -Wno-reorder
-Wno-error=cpp) -Wno-error=cpp)
set(ANAKIN_LIBRARY_URL "https://github.com/pangge/Anakin/releases/download/3.0/anakin_release_simple.tar.gz") set(ANAKIN_LIBRARY_URL "https://github.com/pangge/Anakin/releases/download/Version0.1.0/anakin.tar.gz")
# A helper function used in Anakin, currently, to use it, one need to recursively include # A helper function used in Anakin, currently, to use it, one need to recursively include
# nearly all the header files. # nearly all the header files.
...@@ -41,9 +42,9 @@ if (NOT EXISTS "${ANAKIN_INSTALL_DIR}") ...@@ -41,9 +42,9 @@ if (NOT EXISTS "${ANAKIN_INSTALL_DIR}")
message(STATUS "Download Anakin library from ${ANAKIN_LIBRARY_URL}") message(STATUS "Download Anakin library from ${ANAKIN_LIBRARY_URL}")
execute_process(COMMAND bash -c "mkdir -p ${ANAKIN_INSTALL_DIR}") execute_process(COMMAND bash -c "mkdir -p ${ANAKIN_INSTALL_DIR}")
execute_process(COMMAND bash -c "rm -rf ${ANAKIN_INSTALL_DIR}/*") execute_process(COMMAND bash -c "rm -rf ${ANAKIN_INSTALL_DIR}/*")
execute_process(COMMAND bash -c "cd ${ANAKIN_INSTALL_DIR}; wget -q ${ANAKIN_LIBRARY_URL}") execute_process(COMMAND bash -c "cd ${ANAKIN_INSTALL_DIR}; wget --no-check-certificate -q ${ANAKIN_LIBRARY_URL}")
execute_process(COMMAND bash -c "mkdir -p ${ANAKIN_INSTALL_DIR}") execute_process(COMMAND bash -c "mkdir -p ${ANAKIN_INSTALL_DIR}")
execute_process(COMMAND bash -c "cd ${ANAKIN_INSTALL_DIR}; tar xzf anakin_release_simple.tar.gz") execute_process(COMMAND bash -c "cd ${ANAKIN_INSTALL_DIR}; tar xzf anakin.tar.gz")
endif() endif()
if (WITH_ANAKIN) if (WITH_ANAKIN)
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#pragma once #pragma once
#include <string>
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/inference/analysis/data_flow_graph.h" #include "paddle/fluid/inference/analysis/data_flow_graph.h"
......
...@@ -176,7 +176,7 @@ struct GraphTraits<DataFlowGraph> { ...@@ -176,7 +176,7 @@ struct GraphTraits<DataFlowGraph> {
// sub-graph is the inputs nodes and output nodes that doesn't inside the // sub-graph is the inputs nodes and output nodes that doesn't inside the
// sub-graph. // sub-graph.
std::pair<std::vector<Node *>, std::vector<Node *>> std::pair<std::vector<Node *>, std::vector<Node *>>
ExtractInputAndOutputOfSubGraph(std::vector<Node *> &graph); ExtractInputAndOutputOfSubGraph(std::vector<Node *> &graph); // NOLINT
} // namespace analysis } // namespace analysis
} // namespace inference } // namespace inference
......
...@@ -12,11 +12,13 @@ ...@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/inference/analysis/model_store_pass.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string>
#include "paddle/fluid/inference/analysis/analyzer.h" #include "paddle/fluid/inference/analysis/analyzer.h"
#include "paddle/fluid/inference/analysis/argument.h" #include "paddle/fluid/inference/analysis/argument.h"
#include "paddle/fluid/inference/analysis/model_store_pass.h"
namespace paddle { namespace paddle {
namespace inference { namespace inference {
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
* model in the disk, and that model can be reloaded for prediction. * model in the disk, and that model can be reloaded for prediction.
*/ */
#pragma once
#include <string>
#include "paddle/fluid/inference/analysis/pass.h" #include "paddle/fluid/inference/analysis/pass.h"
namespace paddle { namespace paddle {
......
...@@ -19,6 +19,7 @@ endif(APPLE) ...@@ -19,6 +19,7 @@ endif(APPLE)
set(inference_deps paddle_inference_api paddle_fluid_api) set(inference_deps paddle_inference_api paddle_fluid_api)
if(WITH_GPU AND TENSORRT_FOUND) if(WITH_GPU AND TENSORRT_FOUND)
set(inference_deps ${inference_deps} paddle_inference_tensorrt_subgraph_engine) set(inference_deps ${inference_deps} paddle_inference_tensorrt_subgraph_engine)
endif() endif()
...@@ -63,6 +64,8 @@ endif() ...@@ -63,6 +64,8 @@ endif()
if (WITH_ANAKIN) # only needed in CI if (WITH_ANAKIN) # only needed in CI
# Due to Anakin do not have official library releases and the versions of protobuf and cuda do not match Paddle's, # Due to Anakin do not have official library releases and the versions of protobuf and cuda do not match Paddle's,
# so anakin library will not be merged to our official inference library. To use anakin prediction API, one need to # so anakin library will not be merged to our official inference library. To use anakin prediction API, one need to
# compile the libinference_anakin_api.a and compile with anakin.so.
fetch_include_recursively(${ANAKIN_INCLUDE})
# compile the libinference_anakin_api.a and anakin.so. # compile the libinference_anakin_api.a and anakin.so.
nv_library(inference_anakin_api SRCS api.cc api_anakin_engine.cc) nv_library(inference_anakin_api SRCS api.cc api_anakin_engine.cc)
nv_library(inference_anakin_api_shared SHARED SRCS api.cc api_anakin_engine.cc) nv_library(inference_anakin_api_shared SHARED SRCS api.cc api_anakin_engine.cc)
...@@ -73,7 +76,7 @@ if (WITH_ANAKIN) # only needed in CI ...@@ -73,7 +76,7 @@ if (WITH_ANAKIN) # only needed in CI
if (WITH_TESTING) if (WITH_TESTING)
cc_test(inference_anakin_test SRCS api_anakin_engine_tester.cc cc_test(inference_anakin_test SRCS api_anakin_engine_tester.cc
ARGS --model=${ANAKIN_INSTALL_DIR}/mobilenet_v2.anakin.bin ARGS --model=${ANAKIN_INSTALL_DIR}/mobilenet_v2.anakin.bin
DEPS inference_anakin_api) DEPS inference_anakin_api_shared)
target_compile_options(inference_anakin_test BEFORE PUBLIC ${ANAKIN_COMPILE_EXTRA_FLAGS}) target_compile_options(inference_anakin_test BEFORE PUBLIC ${ANAKIN_COMPILE_EXTRA_FLAGS})
endif(WITH_TESTING) endif(WITH_TESTING)
endif() endif()
...@@ -18,26 +18,36 @@ ...@@ -18,26 +18,36 @@
namespace paddle { namespace paddle {
PaddleInferenceAnakinPredictor::PaddleInferenceAnakinPredictor( template <typename Target>
PaddleInferenceAnakinPredictor<Target>::PaddleInferenceAnakinPredictor(
const AnakinConfig &config) { const AnakinConfig &config) {
CHECK(Init(config)); CHECK(Init(config));
} }
bool PaddleInferenceAnakinPredictor::Init(const AnakinConfig &config) { template <typename Target>
bool PaddleInferenceAnakinPredictor<Target>::Init(const AnakinConfig &config) {
if (!(graph_.load(config.model_file))) { if (!(graph_.load(config.model_file))) {
LOG(FATAL) << "fail to load graph from " << config.model_file;
return false; return false;
} }
graph_.ResetBatchSize("input_0", config.max_batch_size); auto inputs = graph_.get_ins();
for (auto &input_str : inputs) {
graph_.ResetBatchSize(input_str, config.max_batch_size);
}
// optimization for graph // optimization for graph
if (!(graph_.Optimize())) { if (!(graph_.Optimize())) {
return false; return false;
} }
// construct executer // construct executer
executor_.init(graph_); if (executor_p_ == nullptr) {
executor_p_ = new anakin::Net<Target, anakin::saber::AK_FLOAT,
anakin::Precision::FP32>(graph_, true);
}
return true; return true;
} }
bool PaddleInferenceAnakinPredictor::Run( template <typename Target>
bool PaddleInferenceAnakinPredictor<Target>::Run(
const std::vector<PaddleTensor> &inputs, const std::vector<PaddleTensor> &inputs,
std::vector<PaddleTensor> *output_data, int batch_size) { std::vector<PaddleTensor> *output_data, int batch_size) {
for (const auto &input : inputs) { for (const auto &input : inputs) {
...@@ -46,7 +56,29 @@ bool PaddleInferenceAnakinPredictor::Run( ...@@ -46,7 +56,29 @@ bool PaddleInferenceAnakinPredictor::Run(
<< "'s type is not float"; << "'s type is not float";
return false; return false;
} }
auto d_tensor_in_p = executor_.get_in(input.name); auto d_tensor_in_p = executor_p_->get_in(input.name);
auto net_shape = d_tensor_in_p->valid_shape();
if (net_shape.size() != input.shape.size()) {
LOG(ERROR) << " input " << input.name
<< "'s shape size should be equal to that of net";
return false;
}
int sum = 1;
for_each(input.shape.begin(), input.shape.end(), [&](int n) { sum *= n; });
if (sum > net_shape.count()) {
graph_.Reshape(input.name, input.shape);
delete executor_p_;
executor_p_ = new anakin::Net<Target, anakin::saber::AK_FLOAT,
anakin::Precision::FP32>(graph_, true);
d_tensor_in_p = executor_p_->get_in(input.name);
}
anakin::saber::Shape tmp_shape;
for (auto s : input.shape) {
tmp_shape.push_back(s);
}
d_tensor_in_p->reshape(tmp_shape);
float *d_data_p = d_tensor_in_p->mutable_data(); float *d_data_p = d_tensor_in_p->mutable_data();
if (cudaMemcpy(d_data_p, static_cast<float *>(input.data.data()), if (cudaMemcpy(d_data_p, static_cast<float *>(input.data.data()),
d_tensor_in_p->valid_size() * sizeof(float), d_tensor_in_p->valid_size() * sizeof(float),
...@@ -56,16 +88,17 @@ bool PaddleInferenceAnakinPredictor::Run( ...@@ -56,16 +88,17 @@ bool PaddleInferenceAnakinPredictor::Run(
} }
cudaStreamSynchronize(NULL); cudaStreamSynchronize(NULL);
} }
cudaDeviceSynchronize();
executor_.prediction(); executor_p_->prediction();
cudaDeviceSynchronize();
if (output_data->empty()) { if (output_data->empty()) {
LOG(ERROR) << "At least one output should be set with tensors' names."; LOG(ERROR) << "At least one output should be set with tensors' names.";
return false; return false;
} }
for (auto &output : *output_data) { for (auto &output : *output_data) {
auto *tensor = executor_.get_out(output.name); auto *tensor = executor_p_->get_out(output.name);
output.shape = tensor->shape(); output.shape = tensor->valid_shape();
if (output.data.length() < tensor->valid_size() * sizeof(float)) { if (output.data.length() < tensor->valid_size() * sizeof(float)) {
output.data.Resize(tensor->valid_size() * sizeof(float)); output.data.Resize(tensor->valid_size() * sizeof(float));
} }
...@@ -81,19 +114,23 @@ bool PaddleInferenceAnakinPredictor::Run( ...@@ -81,19 +114,23 @@ bool PaddleInferenceAnakinPredictor::Run(
return true; return true;
} }
anakin::Net<anakin::NV, anakin::saber::AK_FLOAT, anakin::Precision::FP32> template <typename Target>
&PaddleInferenceAnakinPredictor::get_executer() { anakin::Net<Target, anakin::saber::AK_FLOAT, anakin::Precision::FP32>
return executor_; &PaddleInferenceAnakinPredictor<Target>::get_executer() {
return *executor_p_;
} }
// the cloned new Predictor of anakin share the same net weights from original // the cloned new Predictor of anakin share the same net weights from original
// Predictor // Predictor
std::unique_ptr<PaddlePredictor> PaddleInferenceAnakinPredictor::Clone() { template <typename Target>
std::unique_ptr<PaddlePredictor>
PaddleInferenceAnakinPredictor<Target>::Clone() {
VLOG(3) << "Anakin Predictor::clone"; VLOG(3) << "Anakin Predictor::clone";
std::unique_ptr<PaddlePredictor> cls(new PaddleInferenceAnakinPredictor()); std::unique_ptr<PaddlePredictor> cls(
new PaddleInferenceAnakinPredictor<Target>());
// construct executer from other graph // construct executer from other graph
auto anakin_predictor_p = auto anakin_predictor_p =
dynamic_cast<PaddleInferenceAnakinPredictor *>(cls.get()); dynamic_cast<PaddleInferenceAnakinPredictor<Target> *>(cls.get());
if (!anakin_predictor_p) { if (!anakin_predictor_p) {
LOG(ERROR) << "fail to call Init"; LOG(ERROR) << "fail to call Init";
return nullptr; return nullptr;
...@@ -103,14 +140,28 @@ std::unique_ptr<PaddlePredictor> PaddleInferenceAnakinPredictor::Clone() { ...@@ -103,14 +140,28 @@ std::unique_ptr<PaddlePredictor> PaddleInferenceAnakinPredictor::Clone() {
return std::move(cls); return std::move(cls);
} }
template class PaddleInferenceAnakinPredictor<anakin::NV>;
template class PaddleInferenceAnakinPredictor<anakin::X86>;
// A factory to help create difference predictor. // A factory to help create difference predictor.
template <> template <>
std::unique_ptr<PaddlePredictor> CreatePaddlePredictor< std::unique_ptr<PaddlePredictor> CreatePaddlePredictor<
AnakinConfig, PaddleEngineKind::kAnakin>(const AnakinConfig &config) { AnakinConfig, PaddleEngineKind::kAnakin>(const AnakinConfig &config) {
VLOG(3) << "Anakin Predictor create."; VLOG(3) << "Anakin Predictor create.";
std::unique_ptr<PaddlePredictor> x( if (config.target_type == AnakinConfig::NVGPU) {
new PaddleInferenceAnakinPredictor(config)); VLOG(3) << "Anakin Predictor create on [ NVIDIA GPU ].";
return x; std::unique_ptr<PaddlePredictor> x(
} new PaddleInferenceAnakinPredictor<anakin::NV>(config));
return x;
} else if (config.target_type == AnakinConfig::X86) {
VLOG(3) << "Anakin Predictor create on [ Intel X86 ].";
std::unique_ptr<PaddlePredictor> x(
new PaddleInferenceAnakinPredictor<anakin::X86>(config));
return x;
} else {
VLOG(3) << "Anakin Predictor create on unknown platform.";
return nullptr;
}
};
} // namespace paddle } // namespace paddle
...@@ -20,14 +20,16 @@ limitations under the License. */ ...@@ -20,14 +20,16 @@ limitations under the License. */
#pragma once #pragma once
#include <vector> #include <vector>
#include "paddle/fluid/inference/api/paddle_inference_api.h"
// from anakin
#include "framework/core/net/net.h" #include "framework/core/net/net.h"
#include "framework/graph/graph.h"
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#include "saber/core/shape.h"
#include "saber/saber_types.h" #include "saber/saber_types.h"
namespace paddle { namespace paddle {
template <typename Target>
class PaddleInferenceAnakinPredictor : public PaddlePredictor { class PaddleInferenceAnakinPredictor : public PaddlePredictor {
public: public:
PaddleInferenceAnakinPredictor() {} PaddleInferenceAnakinPredictor() {}
...@@ -42,19 +44,21 @@ class PaddleInferenceAnakinPredictor : public PaddlePredictor { ...@@ -42,19 +44,21 @@ class PaddleInferenceAnakinPredictor : public PaddlePredictor {
std::unique_ptr<PaddlePredictor> Clone() override; std::unique_ptr<PaddlePredictor> Clone() override;
anakin::Net<anakin::NV, anakin::saber::AK_FLOAT, anakin::Precision::FP32>& anakin::Net<Target, anakin::saber::AK_FLOAT, anakin::Precision::FP32>&
get_executer(); get_executer();
~PaddleInferenceAnakinPredictor() override{}; ~PaddleInferenceAnakinPredictor() override {
delete executor_p_;
executor_p_ = nullptr;
};
private: private:
bool Init(const AnakinConfig& config); bool Init(const AnakinConfig& config);
anakin::graph::Graph<anakin::NV, anakin::saber::AK_FLOAT, anakin::graph::Graph<Target, anakin::saber::AK_FLOAT, anakin::Precision::FP32>
anakin::Precision::FP32>
graph_; graph_;
anakin::Net<anakin::NV, anakin::saber::AK_FLOAT, anakin::Precision::FP32> anakin::Net<Target, anakin::saber::AK_FLOAT, anakin::Precision::FP32>*
executor_; executor_p_{nullptr};
AnakinConfig config_; AnakinConfig config_;
}; };
......
...@@ -12,18 +12,20 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,18 +12,20 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <gflags/gflags.h>
#include <glog/logging.h> #include <glog/logging.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "gflags/gflags.h"
#include "paddle/fluid/inference/api/paddle_inference_api.h" #include "paddle/fluid/inference/api/paddle_inference_api.h"
DEFINE_string(model, "", "Directory of the inference model."); DEFINE_string(model, "", "Directory of the inference model(mobile_v2).");
namespace paddle { namespace paddle {
AnakinConfig GetConfig() { AnakinConfig GetConfig() {
AnakinConfig config; AnakinConfig config;
// using AnakinConfig::X86 if you need to use cpu to do inference
config.target_type = AnakinConfig::NVGPU;
config.model_file = FLAGS_model; config.model_file = FLAGS_model;
config.device = 0; config.device = 0;
config.max_batch_size = 1; config.max_batch_size = 1;
...@@ -36,7 +38,6 @@ TEST(inference, anakin) { ...@@ -36,7 +38,6 @@ TEST(inference, anakin) {
CreatePaddlePredictor<AnakinConfig, PaddleEngineKind::kAnakin>(config); CreatePaddlePredictor<AnakinConfig, PaddleEngineKind::kAnakin>(config);
float data[1 * 3 * 224 * 224] = {1.0f}; float data[1 * 3 * 224 * 224] = {1.0f};
PaddleTensor tensor; PaddleTensor tensor;
tensor.name = "input_0"; tensor.name = "input_0";
tensor.shape = std::vector<int>({1, 3, 224, 224}); tensor.shape = std::vector<int>({1, 3, 224, 224});
...@@ -44,22 +45,20 @@ TEST(inference, anakin) { ...@@ -44,22 +45,20 @@ TEST(inference, anakin) {
tensor.dtype = PaddleDType::FLOAT32; tensor.dtype = PaddleDType::FLOAT32;
// For simplicity, we set all the slots with the same data. // For simplicity, we set all the slots with the same data.
std::vector<PaddleTensor> paddle_tensor_feeds; std::vector<PaddleTensor> paddle_tensor_feeds(1, tensor);
paddle_tensor_feeds.emplace_back(std::move(tensor));
PaddleTensor tensor_out; PaddleTensor tensor_out;
tensor_out.name = "prob_out"; tensor_out.name = "prob_out";
tensor_out.shape = std::vector<int>({1000, 1}); tensor_out.shape = std::vector<int>({});
tensor_out.data = PaddleBuf(); tensor_out.data = PaddleBuf();
tensor_out.dtype = PaddleDType::FLOAT32; tensor_out.dtype = PaddleDType::FLOAT32;
std::vector<PaddleTensor> outputs; std::vector<PaddleTensor> outputs(1, tensor_out);
outputs.emplace_back(std::move(tensor_out));
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs)); ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
float* data_o = static_cast<float*>(outputs[0].data.data()); float* data_o = static_cast<float*>(outputs[0].data.data());
for (size_t j = 0; j < 1000; ++j) { for (size_t j = 0; j < outputs[0].data.length(); ++j) {
LOG(INFO) << "output[" << j << "]: " << data_o[j]; LOG(INFO) << "output[" << j << "]: " << data_o[j];
} }
} }
......
...@@ -20,8 +20,8 @@ limitations under the License. */ ...@@ -20,8 +20,8 @@ limitations under the License. */
#include <glog/logging.h> // use glog instead of PADDLE_ENFORCE to avoid importing other paddle header files. #include <glog/logging.h> // use glog instead of PADDLE_ENFORCE to avoid importing other paddle header files.
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include "paddle/fluid/inference/demo_ci/utils.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "utils.h"
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
DECLARE_double(fraction_of_gpu_memory_to_use); DECLARE_double(fraction_of_gpu_memory_to_use);
......
...@@ -44,7 +44,7 @@ class PaddleBuf { ...@@ -44,7 +44,7 @@ class PaddleBuf {
PaddleBuf(void* data, size_t length) PaddleBuf(void* data, size_t length)
: data_(data), length_(length), memory_owned_{false} {} : data_(data), length_(length), memory_owned_{false} {}
// Own memory. // Own memory.
PaddleBuf(size_t length) explicit PaddleBuf(size_t length)
: data_(new char[length]), length_(length), memory_owned_(true) {} : data_(new char[length]), length_(length), memory_owned_(true) {}
// Resize to `length` bytes. // Resize to `length` bytes.
void Resize(size_t length); void Resize(size_t length);
...@@ -126,9 +126,11 @@ struct NativeConfig : public PaddlePredictor::Config { ...@@ -126,9 +126,11 @@ struct NativeConfig : public PaddlePredictor::Config {
// Configurations for Anakin engine. // Configurations for Anakin engine.
struct AnakinConfig : public PaddlePredictor::Config { struct AnakinConfig : public PaddlePredictor::Config {
enum TargetType { NVGPU = 0, X86 };
int device; int device;
std::string model_file; std::string model_file;
int max_batch_size{-1}; int max_batch_size{-1};
TargetType target_type;
}; };
struct TensorRTConfig : public NativeConfig { struct TensorRTConfig : public NativeConfig {
......
...@@ -38,7 +38,7 @@ void Reorder2(nvinfer1::DimsHW shape, const T* idata, nvinfer1::DimsHW istrides, ...@@ -38,7 +38,7 @@ void Reorder2(nvinfer1::DimsHW shape, const T* idata, nvinfer1::DimsHW istrides,
} }
// indata c * k // indata c * k
// Reorder the data layout from CK to KC. // Reorder the data layout from CK to KC.
void ReorderCKtoKC(TensorRTEngine::Weight& iweights, void ReorderCKtoKC(TensorRTEngine::Weight& iweights, // NOLINT
TensorRTEngine::Weight* oweights) { TensorRTEngine::Weight* oweights) {
int c = iweights.dims[0]; int c = iweights.dims[0];
int k = iweights.dims[1]; int k = iweights.dims[1];
......
...@@ -55,7 +55,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -55,7 +55,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireSrcMemoryFromWeightsPrimitive( std::shared_ptr<mkldnn::memory> AcquireSrcMemoryFromWeightsPrimitive(
const std::shared_ptr<mkldnn::memory> user_memory_p, const std::shared_ptr<mkldnn::memory> user_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto src_pd = conv_bwd_weights_pd_->src_primitive_desc(); auto src_pd = conv_bwd_weights_pd_->src_primitive_desc();
auto user_pd = user_memory_p->get_primitive_desc(); auto user_pd = user_memory_p->get_primitive_desc();
return this->AcquireMemory(src_pd, user_pd, user_memory_p, return this->AcquireMemory(src_pd, user_pd, user_memory_p,
...@@ -64,7 +64,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -64,7 +64,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireDiffDstMemoryFromWeightsPrimitive( std::shared_ptr<mkldnn::memory> AcquireDiffDstMemoryFromWeightsPrimitive(
const std::shared_ptr<mkldnn::memory> user_memory_p, const std::shared_ptr<mkldnn::memory> user_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto diff_dst_pd = conv_bwd_weights_pd_->diff_dst_primitive_desc(); auto diff_dst_pd = conv_bwd_weights_pd_->diff_dst_primitive_desc();
auto user_pd = user_memory_p->get_primitive_desc(); auto user_pd = user_memory_p->get_primitive_desc();
return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p, return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p,
...@@ -80,7 +80,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -80,7 +80,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireDiffDstMemoryFromDataPrimitive( std::shared_ptr<mkldnn::memory> AcquireDiffDstMemoryFromDataPrimitive(
const std::shared_ptr<mkldnn::memory> user_memory_p, const std::shared_ptr<mkldnn::memory> user_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto diff_dst_pd = conv_bwd_data_pd_->diff_dst_primitive_desc(); auto diff_dst_pd = conv_bwd_data_pd_->diff_dst_primitive_desc();
auto user_pd = user_memory_p->get_primitive_desc(); auto user_pd = user_memory_p->get_primitive_desc();
return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p, return this->AcquireMemory(diff_dst_pd, user_pd, user_memory_p,
...@@ -89,7 +89,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -89,7 +89,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireWeightsMemoryFromDataPrimitive( std::shared_ptr<mkldnn::memory> AcquireWeightsMemoryFromDataPrimitive(
const std::shared_ptr<mkldnn::memory> user_weights_memory_p, const std::shared_ptr<mkldnn::memory> user_weights_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto weights_pd = conv_bwd_data_pd_->weights_primitive_desc(); auto weights_pd = conv_bwd_data_pd_->weights_primitive_desc();
auto user_pd = user_weights_memory_p->get_primitive_desc(); auto user_pd = user_weights_memory_p->get_primitive_desc();
return this->AcquireMemory(weights_pd, user_pd, user_weights_memory_p, return this->AcquireMemory(weights_pd, user_pd, user_weights_memory_p,
...@@ -109,7 +109,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -109,7 +109,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireSrcMemoryFromPrimitive( std::shared_ptr<mkldnn::memory> AcquireSrcMemoryFromPrimitive(
const std::shared_ptr<mkldnn::memory> user_memory_p, const std::shared_ptr<mkldnn::memory> user_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto src_pd = conv_pd_->src_primitive_desc(); auto src_pd = conv_pd_->src_primitive_desc();
auto user_pd = user_memory_p->get_primitive_desc(); auto user_pd = user_memory_p->get_primitive_desc();
return this->AcquireMemory(src_pd, user_pd, user_memory_p, "@src_mem_p", return this->AcquireMemory(src_pd, user_pd, user_memory_p, "@src_mem_p",
...@@ -118,7 +118,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -118,7 +118,7 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
std::shared_ptr<mkldnn::memory> AcquireWeightsMemoryFromPrimitive( std::shared_ptr<mkldnn::memory> AcquireWeightsMemoryFromPrimitive(
const std::shared_ptr<mkldnn::memory> user_weights_memory_p, const std::shared_ptr<mkldnn::memory> user_weights_memory_p,
std::vector<mkldnn::primitive>& pipeline) { std::vector<mkldnn::primitive>& pipeline) { // NOLINT
auto user_weights_pd = user_weights_memory_p->get_primitive_desc(); auto user_weights_pd = user_weights_memory_p->get_primitive_desc();
auto weights_pd = conv_pd_->weights_primitive_desc(); auto weights_pd = conv_pd_->weights_primitive_desc();
return this->AcquireMemory(weights_pd, user_weights_pd, return this->AcquireMemory(weights_pd, user_weights_pd,
...@@ -197,12 +197,12 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler { ...@@ -197,12 +197,12 @@ class ConvMKLDNNHandler : public platform::MKLDNNHandler {
// Generate keys for storing/retriving primitives for this operator // Generate keys for storing/retriving primitives for this operator
// TODO(jczaja): Make hashing function more optimial // TODO(jczaja): Make hashing function more optimial
static std::string GetHash(memory::dims& input_dims, static std::string GetHash(memory::dims& input_dims, // NOLINT
memory::dims& weights_dims, memory::dims& weights_dims, // NOLINT
std::vector<int>& strides, std::vector<int>& strides, // NOLINT
std::vector<int>& paddings, std::vector<int>& paddings, // NOLINT
std::vector<int>& dilations, int groups, std::vector<int>& dilations, // NOLINT
const std::string& suffix) { int groups, const std::string& suffix) {
return dims2str(input_dims) + dims2str(weights_dims) + dims2str(strides) + return dims2str(input_dims) + dims2str(weights_dims) + dims2str(strides) +
dims2str(paddings) + dims2str(dilations) + std::to_string(groups) + dims2str(paddings) + dims2str(dilations) + std::to_string(groups) +
suffix; suffix;
......
...@@ -121,7 +121,7 @@ class ParallelExecutor(object): ...@@ -121,7 +121,7 @@ class ParallelExecutor(object):
else: else:
cpu_num = int( cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num exec_strategy.num_threads = cpu_num * 2
if build_strategy is None: if build_strategy is None:
build_strategy = BuildStrategy() build_strategy = BuildStrategy()
......
...@@ -49,6 +49,7 @@ list(REMOVE_ITEM TEST_OPS test_dist_train) ...@@ -49,6 +49,7 @@ list(REMOVE_ITEM TEST_OPS test_dist_train)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
list(REMOVE_ITEM TEST_OPS test_dist_se_resnext) list(REMOVE_ITEM TEST_OPS test_dist_se_resnext)
list(REMOVE_ITEM TEST_OPS test_dist_transformer)
foreach(TEST_OP ${TEST_OPS}) foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach(TEST_OP) endforeach(TEST_OP)
...@@ -61,4 +62,5 @@ if(WITH_DISTRIBUTE) ...@@ -61,4 +62,5 @@ if(WITH_DISTRIBUTE)
endif() endif()
py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL) py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL)
py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL) py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL)
py_test_modules(test_dist_transformer MODULES test_dist_transformer SERIAL)
py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext SERIAL) py_test_modules(test_dist_se_resnext MODULES test_dist_se_resnext SERIAL)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import time
import math
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
import os
import sys
import transformer_model
import paddle.dataset.wmt16 as wmt16
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
WMT16_RECORDIO_FILE = "/tmp/wmt16.recordio"
class ModelHyperParams(object):
# Dictionary size for source and target language. This model directly uses
# paddle.dataset.wmt16 in which <bos>, <eos> and <unk> token has
# alreay been added, but the <pad> token is not added. Transformer requires
# sequences in a mini-batch are padded to have the same length. A <pad> token is
# added into the original dictionary in paddle.dateset.wmt16.
# size of source word dictionary.
src_vocab_size = 10000
# index for <pad> token in source language.
src_pad_idx = src_vocab_size
# size of target word dictionay
trg_vocab_size = 10000
# index for <pad> token in target language.
trg_pad_idx = trg_vocab_size
# position value corresponding to the <pad> token.
pos_pad_idx = 0
# max length of sequences. It should plus 1 to include position
# padding token for position encoding.
max_length = 50
# the dimension for word embeddings, which is also the last dimension of
# the input and output of multi-head attention, position-wise feed-forward
# networks, encoder and decoder.
d_model = 512
# size of the hidden layer in position-wise feed-forward networks.
d_inner_hid = 1024
# the dimension that keys are projected to for dot-product attention.
d_key = 64
# the dimension that values are projected to for dot-product attention.
d_value = 64
# number of head used in multi-head attention.
n_head = 8
# number of sub-layers to be stacked in the encoder and decoder.
n_layer = 6
# dropout rate used by all dropout layers.
dropout = 0.1
def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias. Then, convert the numpy
data to tensors and return a dict mapping names to tensors.
"""
def __pad_batch_data(insts,
pad_idx,
is_target=False,
return_pos=True,
return_attn_bias=True,
return_max_len=True):
"""
Pad the instances to the max sequence length in batch, and generate the
corresponding position data and attention bias.
"""
return_list = []
max_len = max(len(inst) for inst in insts)
inst_data = np.array(
[inst + [pad_idx] * (max_len - len(inst)) for inst in insts])
return_list += [inst_data.astype("int64").reshape([-1, 1])]
if return_pos:
inst_pos = np.array([[
pos_i + 1 if w_i != pad_idx else 0
for pos_i, w_i in enumerate(inst)
] for inst in inst_data])
return_list += [inst_pos.astype("int64").reshape([-1, 1])]
if return_attn_bias:
if is_target:
# This is used to avoid attention on paddings and subsequent
# words.
slf_attn_bias_data = np.ones((inst_data.shape[0], max_len,
max_len))
slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape(
[-1, 1, max_len, max_len])
slf_attn_bias_data = np.tile(slf_attn_bias_data,
[1, n_head, 1, 1]) * [-1e9]
else:
# This is used to avoid attention on paddings.
slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] *
(max_len - len(inst))
for inst in insts])
slf_attn_bias_data = np.tile(
slf_attn_bias_data.reshape([-1, 1, 1, max_len]),
[1, n_head, max_len, 1])
return_list += [slf_attn_bias_data.astype("float32")]
if return_max_len:
return_list += [max_len]
return return_list if len(return_list) > 1 else return_list[0]
src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data(
[inst[0] for inst in insts], src_pad_idx, is_target=False)
trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data(
[inst[1] for inst in insts], trg_pad_idx, is_target=True)
trg_src_attn_bias = np.tile(src_slf_attn_bias[:, :, ::src_max_len, :],
[1, 1, trg_max_len, 1]).astype("float32")
lbl_word = __pad_batch_data([inst[2] for inst in insts], trg_pad_idx, False,
False, False, False)
lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1])
return [
src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias,
trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight
]
def transformer(use_feed):
assert not use_feed, "transfomer doesn't support feed yet"
return transformer_model.transformer(
ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
ModelHyperParams.n_layer, ModelHyperParams.n_head,
ModelHyperParams.d_key, ModelHyperParams.d_value,
ModelHyperParams.d_model, ModelHyperParams.d_inner_hid,
ModelHyperParams.dropout, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx)
def get_model():
avg_cost = transformer(use_feed=False)
optimizer = fluid.optimizer.Adam()
optimizer.minimize(avg_cost)
return avg_cost
def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers):
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id=trainer_id,
program=main_program,
pservers=pserver_endpoints,
trainers=trainers)
return t
class DistTransformer2x2(object):
def run_pserver(self, pserver_endpoints, trainers, current_endpoint,
trainer_id):
get_model()
t = get_transpiler(trainer_id,
fluid.default_main_program(), pserver_endpoints,
trainers)
pserver_prog = t.get_pserver_program(current_endpoint)
startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
exe.run(pserver_prog)
def _wait_ps_ready(self, pid):
retry_times = 20
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(3)
print("waiting ps ready: ", pid)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
retry_times -= 1
def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True):
avg_cost = get_model()
if is_dist:
t = get_transpiler(trainer_id,
fluid.default_main_program(), endpoints,
trainers)
trainer_prog = t.get_trainer_program()
else:
trainer_prog = fluid.default_main_program()
startup_exe = fluid.Executor(place)
startup_exe.run(fluid.default_startup_program())
strategy = fluid.ExecutionStrategy()
strategy.num_threads = 1
strategy.allow_op_delay = False
exe = fluid.ParallelExecutor(
True, loss_name=avg_cost.name, exec_strategy=strategy)
first_loss, = exe.run(fetch_list=[avg_cost.name])
print(first_loss)
for i in xrange(5):
_ = exe.run(fetch_list=[avg_cost.name])
last_loss, = exe.run(fetch_list=[avg_cost.name])
print(last_loss)
def main(role="pserver",
endpoints="127.0.0.1:9123",
trainer_id=0,
current_endpoint="127.0.0.1:9123",
trainers=1,
is_dist=True):
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=transformer_model.batch_size)
with fluid.recordio_writer.create_recordio_writer(
WMT16_RECORDIO_FILE) as writer:
for batch in reader():
for tensor in prepare_batch_input(
batch, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head):
t = fluid.LoDTensor()
t.set(tensor, fluid.CPUPlace())
writer.append_tensor(t)
writer.complete_append_tensor()
model = DistTransformer2x2()
if role == "pserver":
model.run_pserver(endpoints, trainers, current_endpoint, trainer_id)
else:
p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
) else fluid.CPUPlace()
model.run_trainer(p, endpoints, trainer_id, trainers, is_dist)
if __name__ == "__main__":
if len(sys.argv) != 7:
print(
"Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]"
)
role = sys.argv[1]
endpoints = sys.argv[2]
trainer_id = int(sys.argv[3])
current_endpoint = sys.argv[4]
trainers = int(sys.argv[5])
is_dist = True if sys.argv[6] == "TRUE" else False
main(
role=role,
endpoints=endpoints,
trainer_id=trainer_id,
current_endpoint=current_endpoint,
trainers=trainers,
is_dist=is_dist)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import unittest
import os
import sys
import signal
import subprocess
class TestDistBase(unittest.TestCase):
def setUp(self):
self._trainers = 2
self._pservers = 2
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
self._python_interp = "python"
def start_pserver(self, model_file):
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps0_cmd = "%s %s pserver %s 0 %s %d TRUE" % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers)
ps1_cmd = "%s %s pserver %s 0 %s %d TRUE" % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers)
ps0_proc = subprocess.Popen(
ps0_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps1_proc = subprocess.Popen(
ps1_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return ps0_proc, ps1_proc
def _wait_ps_ready(self, pid):
retry_times = 50
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(3)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error as e:
sys.stderr.write('waiting for pserver: %s, left retry %d\n' %
(e, retry_times))
retry_times -= 1
def check_with_place(self, model_file, delta=1e-3):
# *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN
required_envs = {
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH"),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15"
}
# Run local to get a base line
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
env_local.update(required_envs)
local_cmd = "%s %s trainer %s 0 %s %d FLASE" % \
(self._python_interp, model_file,
"127.0.0.1:1234", "127.0.0.1:1234", 1)
local_proc = subprocess.Popen(
local_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_local)
local_proc.wait()
out, err = local_proc.communicate()
local_ret = out
sys.stderr.write('local_loss: %s\n' % local_ret)
sys.stderr.write('local_stderr: %s\n' % err)
# Run dist train to compare with local results
ps0, ps1 = self.start_pserver(model_file)
self._wait_ps_ready(ps0.pid)
self._wait_ps_ready(ps1.pid)
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr0_cmd = "%s %s trainer %s 0 %s %d TRUE" % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers)
tr1_cmd = "%s %s trainer %s 1 %s %d TRUE" % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers)
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
env0.update(required_envs)
env1.update(required_envs)
FNULL = open(os.devnull, 'w')
tr0_proc = subprocess.Popen(
tr0_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env0)
tr1_proc = subprocess.Popen(
tr1_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env1)
tr0_proc.wait()
tr1_proc.wait()
out, err = tr0_proc.communicate()
sys.stderr.write('dist_stderr: %s\n' % err)
loss_data0 = out
sys.stderr.write('dist_loss: %s\n' % loss_data0)
lines = loss_data0.split("\n")
dist_first_loss = eval(lines[0].replace(" ", ","))[0]
dist_last_loss = eval(lines[1].replace(" ", ","))[0]
local_lines = local_ret.split("\n")
local_first_loss = eval(local_lines[0])[0]
local_last_loss = eval(local_lines[1])[0]
self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta)
self.assertAlmostEqual(local_last_loss, dist_last_loss, delta=delta)
# check tr0_out
# FIXME: ensure the server process is killed
# replace with ps0.terminate()
os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL)
FNULL.close()
...@@ -11,127 +11,14 @@ ...@@ -11,127 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import numpy as np
import argparse
import time
import math
import unittest import unittest
import os from test_dist_base import TestDistBase
import sys
import signal
import subprocess
class TestDistSeResneXt2x2(unittest.TestCase):
def setUp(self):
self._trainers = 2
self._pservers = 2
self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124"
self._python_interp = "python"
def start_pserver(self):
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps0_cmd = "%s dist_se_resnext.py pserver %s 0 %s %d TRUE" % \
(self._python_interp, self._ps_endpoints, ps0_ep, self._trainers)
ps1_cmd = "%s dist_se_resnext.py pserver %s 0 %s %d TRUE" % \
(self._python_interp, self._ps_endpoints, ps1_ep, self._trainers)
ps0_proc = subprocess.Popen(
ps0_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ps1_proc = subprocess.Popen(
ps1_cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return ps0_proc, ps1_proc
def _wait_ps_ready(self, pid):
retry_times = 20
while True:
assert retry_times >= 0, "wait ps ready failed"
time.sleep(3)
try:
# the listen_and_serv_op would touch a file which contains the listen port
# on the /tmp directory until it was ready to process all the RPC call.
os.stat("/tmp/paddle.%d.port" % pid)
return
except os.error:
retry_times -= 1
def test_with_place(self):
# *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN
required_envs = {
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH"),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15"
}
# Run local to get a base line
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
env_local.update(required_envs)
local_cmd = "%s dist_se_resnext.py trainer %s 0 %s %d FLASE" % \
(self._python_interp, "127.0.0.1:1234", "127.0.0.1:1234", 1)
local_proc = subprocess.Popen(
local_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_local)
local_proc.wait()
out, err = local_proc.communicate()
local_ret = out
sys.stderr.write('local_loss: %s\n' % local_ret)
sys.stderr.write('local_stderr: %s\n' % err)
# Run dist train to compare with local results
ps0, ps1 = self.start_pserver()
self._wait_ps_ready(ps0.pid)
self._wait_ps_ready(ps1.pid)
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr0_cmd = "%s dist_se_resnext.py trainer %s 0 %s %d TRUE" % \
(self._python_interp, self._ps_endpoints, ps0_ep, self._trainers)
tr1_cmd = "%s dist_se_resnext.py trainer %s 1 %s %d TRUE" % \
(self._python_interp, self._ps_endpoints, ps1_ep, self._trainers)
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
env0.update(required_envs)
env1.update(required_envs)
FNULL = open(os.devnull, 'w')
tr0_proc = subprocess.Popen(
tr0_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env0)
tr1_proc = subprocess.Popen(
tr1_cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env1)
tr0_proc.wait()
tr1_proc.wait()
out, err = tr0_proc.communicate()
sys.stderr.write('dist_stderr: %s\n' % err)
loss_data0 = out
sys.stderr.write('dist_loss: %s\n' % loss_data0)
lines = loss_data0.split("\n")
dist_first_loss = eval(lines[0].replace(" ", ","))[0]
dist_last_loss = eval(lines[1].replace(" ", ","))[0]
local_lines = local_ret.split("\n")
local_first_loss = eval(local_lines[0])[0]
local_last_loss = eval(local_lines[1])[0]
self.assertAlmostEqual(local_first_loss, dist_first_loss)
self.assertAlmostEqual(local_last_loss, dist_last_loss)
# check tr0_out class TestDistSeResneXt2x2(TestDistBase):
# FIXME: ensure the server process is killed def test_se_resnext(self):
# replace with ps0.terminate() # TODO(paddle-dev): Is the delta too large?
os.kill(ps0.pid, signal.SIGKILL) self.check_with_place("dist_se_resnext.py", delta=0.2)
os.kill(ps1.pid, signal.SIGKILL)
FNULL.close()
if __name__ == "__main__": if __name__ == "__main__":
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test_dist_base import TestDistBase
class TestDistTransformer2x2(TestDistBase):
def test_transformer(self):
# TODO(paddle-dev): check if the delta is OK.
# Usually start around ~8000 and converge to ~5000
self.check_with_place("dist_transformer.py", delta=400)
if __name__ == "__main__":
unittest.main()
...@@ -21,7 +21,7 @@ import paddle ...@@ -21,7 +21,7 @@ import paddle
import paddle.dataset.wmt16 as wmt16 import paddle.dataset.wmt16 as wmt16
import os import os
WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio" WMT16_RECORDIO_FILE = "/tmp/wmt16.recordio"
class ModelHyperParams(object): class ModelHyperParams(object):
......
...@@ -403,7 +403,7 @@ def transformer( ...@@ -403,7 +403,7 @@ def transformer(
trg_pad_idx, trg_pad_idx,
pos_pad_idx, ): pos_pad_idx, ):
file_obj = fluid.layers.open_recordio_file( file_obj = fluid.layers.open_recordio_file(
filename='./wmt16.recordio', filename='/tmp/wmt16.recordio',
shapes=[ shapes=[
[batch_size * max_length, 1], [batch_size * max_length, 1],
[batch_size * max_length, 1], [batch_size * max_length, 1],
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册