提交 1992f709 编写于 作者: L Luo Tao

Merge branch 'develop' into refine_relu_test

...@@ -25,7 +25,6 @@ message(STATUS "CXX compiler: ${CMAKE_CXX_COMPILER}, version: " ...@@ -25,7 +25,6 @@ message(STATUS "CXX compiler: ${CMAKE_CXX_COMPILER}, version: "
message(STATUS "C compiler: ${CMAKE_C_COMPILER}, version: " message(STATUS "C compiler: ${CMAKE_C_COMPILER}, version: "
"${CMAKE_C_COMPILER_ID} ${CMAKE_C_COMPILER_VERSION}") "${CMAKE_C_COMPILER_ID} ${CMAKE_C_COMPILER_VERSION}")
find_package(Sphinx)
if(NOT CMAKE_CROSSCOMPILING) if(NOT CMAKE_CROSSCOMPILING)
find_package(CUDA QUIET) find_package(CUDA QUIET)
endif(NOT CMAKE_CROSSCOMPILING) endif(NOT CMAKE_CROSSCOMPILING)
...@@ -226,5 +225,7 @@ if(WITH_PYTHON) ...@@ -226,5 +225,7 @@ if(WITH_PYTHON)
endif() endif()
if(WITH_DOC) if(WITH_DOC)
find_package(Sphinx REQUIRED)
find_python_module(recommonmark REQUIRED)
add_subdirectory(doc) add_subdirectory(doc)
endif() endif()
...@@ -192,6 +192,10 @@ class ExecutionContext { ...@@ -192,6 +192,10 @@ class ExecutionContext {
return op_.Attr<T>(name); return op_.Attr<T>(name);
} }
bool HasInput(const std::string& name) const { return op_.HasInputs(name); }
bool HasOutput(const std::string& name) const { return op_.HasOutputs(name); }
size_t InputSize(const std::string& name) const { size_t InputSize(const std::string& name) const {
return op_.Inputs(name).size(); return op_.Inputs(name).size();
} }
......
...@@ -58,7 +58,8 @@ ParallelExecutor::ParallelExecutor( ...@@ -58,7 +58,8 @@ ParallelExecutor::ParallelExecutor(
const std::unordered_set<std::string> &bcast_vars, const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name, const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay, Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay,
bool use_default_grad_scale, bool balance_parameter_opt_between_cards) bool use_default_grad_scale, bool balance_parameter_opt_between_cards,
size_t num_trainers, size_t trainer_id)
: member_(new ParallelExecutorPrivate(places)) { : member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope; member_->global_scope_ = scope;
...@@ -80,7 +81,13 @@ ParallelExecutor::ParallelExecutor( ...@@ -80,7 +81,13 @@ ParallelExecutor::ParallelExecutor(
// Bcast Parameters to all GPUs // Bcast Parameters to all GPUs
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_)); auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
ncclUniqueId *nccl_id = nullptr;
if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
}
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
member_->places_, nccl_id, num_trainers, trainer_id));
#endif #endif
if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 && if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 &&
local_scopes.empty()) { // Is CUDA local_scopes.empty()) { // Is CUDA
......
...@@ -41,7 +41,8 @@ class ParallelExecutor { ...@@ -41,7 +41,8 @@ class ParallelExecutor {
const std::string& loss_var_name, Scope* scope, const std::string& loss_var_name, Scope* scope,
const std::vector<Scope*>& local_scopes, const std::vector<Scope*>& local_scopes,
bool allow_op_delay, bool use_default_grad_scale, bool allow_op_delay, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards); bool balance_parameter_opt_between_cards,
size_t num_trainers = 1, size_t trainer_id = 0);
~ParallelExecutor(); ~ParallelExecutor();
......
...@@ -21,15 +21,18 @@ namespace tensorrt { ...@@ -21,15 +21,18 @@ namespace tensorrt {
class ReluOpConverter : public OpConverter { class ReluOpConverter : public OpConverter {
public: public:
ReluOpConverter() {} ReluOpConverter() {}
void operator()(const framework::OpDesc& op) override { void operator()(const framework::proto::OpDesc& op) override {
// Here the two nullptr looks strange, that's because the
// framework::OpDesc's constructor is strange.
framework::OpDesc op_desc(op, nullptr, nullptr);
LOG(INFO) << "convert a fluid relu op to tensorrt activation layer whose " LOG(INFO) << "convert a fluid relu op to tensorrt activation layer whose "
"type is Relu"; "type is Relu";
const nvinfer1::ITensor* input_tensor = const nvinfer1::ITensor* input_tensor =
engine_->GetITensor(op.Input("X")[0]); engine_->GetITensor(op_desc.Input("X")[0]);
nvinfer1::IActivationLayer* layer = TRT_ENGINE_ADD_LAYER( nvinfer1::IActivationLayer* layer = TRT_ENGINE_ADD_LAYER(
engine_, Activation, *const_cast<nvinfer1::ITensor*>(input_tensor), engine_, Activation, *const_cast<nvinfer1::ITensor*>(input_tensor),
nvinfer1::ActivationType::kRELU); nvinfer1::ActivationType::kRELU);
engine_->SetITensor(op.Output("Out")[0], layer->getOutput(0)); engine_->SetITensor(op_desc.Output("Out")[0], layer->getOutput(0));
} }
}; };
......
...@@ -21,7 +21,7 @@ namespace tensorrt { ...@@ -21,7 +21,7 @@ namespace tensorrt {
class Conv2dOpConverter : public OpConverter { class Conv2dOpConverter : public OpConverter {
public: public:
Conv2dOpConverter() {} Conv2dOpConverter() {}
void operator()(const framework::OpDesc& op) override { void operator()(const framework::proto::OpDesc& op) override {
LOG(INFO) LOG(INFO)
<< "convert a fluid conv2d op to tensorrt conv layer without bias"; << "convert a fluid conv2d op to tensorrt conv layer without bias";
} }
......
...@@ -39,7 +39,7 @@ class DefaultIOConverter : public EngineIOConverter { ...@@ -39,7 +39,7 @@ class DefaultIOConverter : public EngineIOConverter {
cudaMemcpyHostToDevice, *stream_)); cudaMemcpyHostToDevice, *stream_));
} else if (is_gpu_place(place)) { } else if (is_gpu_place(place)) {
PADDLE_ENFORCE_EQ(0, cudaMemcpyAsync(out, in.data<float>(), size, PADDLE_ENFORCE_EQ(0, cudaMemcpyAsync(out, in.data<float>(), size,
cudaMemcpyHostToHost, *stream_)); cudaMemcpyDeviceToDevice, *stream_));
} else { } else {
PADDLE_THROW("Unknown device for converter"); PADDLE_THROW("Unknown device for converter");
} }
......
...@@ -21,7 +21,7 @@ namespace tensorrt { ...@@ -21,7 +21,7 @@ namespace tensorrt {
class MulOpConverter : public OpConverter { class MulOpConverter : public OpConverter {
public: public:
MulOpConverter() {} MulOpConverter() {}
void operator()(const framework::OpDesc& op) override { void operator()(const framework::proto::OpDesc& op) override {
LOG(INFO) << "convert a fluid mul op to tensorrt fc layer without bias"; LOG(INFO) << "convert a fluid mul op to tensorrt fc layer without bias";
} }
}; };
......
...@@ -31,10 +31,10 @@ namespace tensorrt { ...@@ -31,10 +31,10 @@ namespace tensorrt {
class OpConverter { class OpConverter {
public: public:
OpConverter() {} OpConverter() {}
virtual void operator()(const framework::OpDesc& op) {} virtual void operator()(const framework::proto::OpDesc& op) {}
void Run(const framework::OpDesc& op, TensorRTEngine* engine) { void Run(const framework::proto::OpDesc& op, TensorRTEngine* engine) {
std::string type = op.Type(); std::string type = op.type();
auto* it = Registry<OpConverter>::Lookup(type); auto* it = Registry<OpConverter>::Lookup(type);
PADDLE_ENFORCE_NOT_NULL(it, "no OpConverter for optype [%s]", type); PADDLE_ENFORCE_NOT_NULL(it, "no OpConverter for optype [%s]", type);
it->SetEngine(engine); it->SetEngine(engine);
...@@ -42,14 +42,16 @@ class OpConverter { ...@@ -42,14 +42,16 @@ class OpConverter {
} }
// convert fluid op to tensorrt layer // convert fluid op to tensorrt layer
void ConvertOp(const framework::OpDesc& op, TensorRTEngine* engine) { void ConvertOp(const framework::proto::OpDesc& op, TensorRTEngine* engine) {
OpConverter::Run(op, engine); OpConverter::Run(op, engine);
} }
// convert fluid block to tensorrt network // convert fluid block to tensorrt network
void ConvertBlock(const framework::BlockDesc& block, TensorRTEngine* engine) { void ConvertBlock(const framework::proto::BlockDesc& block,
for (auto op : block.AllOps()) { TensorRTEngine* engine) {
OpConverter::Run(*op, engine); for (size_t i = 0; i < block.ops_size(); i++) {
const auto& op = block.ops(i);
OpConverter::Run(op, engine);
} }
} }
......
...@@ -51,7 +51,7 @@ void Compare(const std::string op_type, float input, float expect) { ...@@ -51,7 +51,7 @@ void Compare(const std::string op_type, float input, float expect) {
op_desc.SetInput("X", {"X"}); op_desc.SetInput("X", {"X"});
op_desc.SetOutput("Out", {"Out"}); op_desc.SetOutput("Out", {"Out"});
auto op = framework::OpRegistry::CreateOp(op_desc); auto op = framework::OpRegistry::CreateOp(*op_desc.Proto());
// run fluid op // run fluid op
op->Run(scope, place); op->Run(scope, place);
...@@ -68,7 +68,8 @@ void Compare(const std::string op_type, float input, float expect) { ...@@ -68,7 +68,8 @@ void Compare(const std::string op_type, float input, float expect) {
nvinfer1::DimsCHW{1, 1, 1}); nvinfer1::DimsCHW{1, 1, 1});
// convert op // convert op
OpConverter op_converter; OpConverter op_converter;
op_converter.ConvertOp(op_desc, engine); op_converter.ConvertOp(*op_desc.Proto(), engine);
engine->DeclareOutput("Out"); engine->DeclareOutput("Out");
engine->FreezeNetwork(); engine->FreezeNetwork();
......
...@@ -29,7 +29,7 @@ TEST(OpConverter, ConvertBlock) { ...@@ -29,7 +29,7 @@ TEST(OpConverter, ConvertBlock) {
conv2d_op->SetType("conv2d"); conv2d_op->SetType("conv2d");
OpConverter converter; OpConverter converter;
converter.ConvertBlock(*block, nullptr /*TensorRTEngine*/); converter.ConvertBlock(*block->Proto(), nullptr /*TensorRTEngine*/);
} }
} // namespace tensorrt } // namespace tensorrt
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/inference/tests/test_helper.h" #include "paddle/fluid/inference/tests/test_helper.h"
DEFINE_string(data_set, "cifar10", "Data set to test");
DEFINE_string(dirname, "", "Directory of the inference model."); DEFINE_string(dirname, "", "Directory of the inference model.");
DEFINE_string(fp16_dirname, "", "Directory of the float16 inference model."); DEFINE_string(fp16_dirname, "", "Directory of the float16 inference model.");
DEFINE_int32(batch_size, 1, "Batch size of input data"); DEFINE_int32(batch_size, 1, "Batch size of input data");
...@@ -35,19 +34,19 @@ TEST(inference, image_classification) { ...@@ -35,19 +34,19 @@ TEST(inference, image_classification) {
// 0. Call `paddle::framework::InitDevices()` initialize all the devices // 0. Call `paddle::framework::InitDevices()` initialize all the devices
// In unittests, this is done in paddle/testing/paddle_gtest_main.cc // In unittests, this is done in paddle/testing/paddle_gtest_main.cc
const bool is_combined = false;
std::vector<std::vector<int64_t>> feed_target_shapes =
GetFeedTargetShapes(dirname, is_combined);
paddle::framework::LoDTensor input; paddle::framework::LoDTensor input;
// Use normilized image pixels as input data, // Use normilized image pixels as input data,
// which should be in the range [0.0, 1.0]. // which should be in the range [0.0, 1.0].
if (FLAGS_data_set == "cifar10") { feed_target_shapes[0][0] = FLAGS_batch_size;
SetupTensor<float>(&input, {FLAGS_batch_size, 3, 32, 32}, paddle::framework::DDim input_dims =
static_cast<float>(0), static_cast<float>(1)); paddle::framework::make_ddim(feed_target_shapes[0]);
} else if (FLAGS_data_set == "imagenet") { LOG(INFO) << input_dims;
SetupTensor<float>(&input, {FLAGS_batch_size, 3, 224, 224}, SetupTensor<float>(&input, input_dims, static_cast<float>(0),
static_cast<float>(0), static_cast<float>(1)); static_cast<float>(1));
} else {
LOG(FATAL) << "Only cifar10 or imagenet is supported.";
}
std::vector<paddle::framework::LoDTensor*> cpu_feeds; std::vector<paddle::framework::LoDTensor*> cpu_feeds;
cpu_feeds.push_back(&input); cpu_feeds.push_back(&input);
...@@ -60,7 +59,7 @@ TEST(inference, image_classification) { ...@@ -60,7 +59,7 @@ TEST(inference, image_classification) {
LOG(INFO) << "--- CPU Runs: ---"; LOG(INFO) << "--- CPU Runs: ---";
LOG(INFO) << "Batch size is " << FLAGS_batch_size; LOG(INFO) << "Batch size is " << FLAGS_batch_size;
TestInference<paddle::platform::CPUPlace, false, true>( TestInference<paddle::platform::CPUPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat); dirname, cpu_feeds, cpu_fetchs1, FLAGS_repeat, is_combined);
LOG(INFO) << output1.dims(); LOG(INFO) << output1.dims();
} }
...@@ -73,7 +72,7 @@ TEST(inference, image_classification) { ...@@ -73,7 +72,7 @@ TEST(inference, image_classification) {
LOG(INFO) << "--- GPU Runs: ---"; LOG(INFO) << "--- GPU Runs: ---";
LOG(INFO) << "Batch size is " << FLAGS_batch_size; LOG(INFO) << "Batch size is " << FLAGS_batch_size;
TestInference<paddle::platform::CUDAPlace, false, true>( TestInference<paddle::platform::CUDAPlace, false, true>(
dirname, cpu_feeds, cpu_fetchs2, FLAGS_repeat); dirname, cpu_feeds, cpu_fetchs2, FLAGS_repeat, is_combined);
LOG(INFO) << output2.dims(); LOG(INFO) << output2.dims();
if (!FLAGS_skip_cpu) { if (!FLAGS_skip_cpu) {
......
...@@ -89,6 +89,50 @@ void CheckError(const paddle::framework::LoDTensor& output1, ...@@ -89,6 +89,50 @@ void CheckError(const paddle::framework::LoDTensor& output1,
EXPECT_EQ(count, 0U) << "There are " << count << " different elements."; EXPECT_EQ(count, 0U) << "There are " << count << " different elements.";
} }
std::unique_ptr<paddle::framework::ProgramDesc> InitProgram(
paddle::framework::Executor* executor, paddle::framework::Scope* scope,
const std::string& dirname, const bool is_combined = false) {
std::unique_ptr<paddle::framework::ProgramDesc> inference_program;
if (is_combined) {
// All parameters are saved in a single file.
// Hard-coding the file names of program and parameters in unittest.
// The file names should be consistent with that used in Python API
// `fluid.io.save_inference_model`.
std::string prog_filename = "__model_combined__";
std::string param_filename = "__params_combined__";
inference_program =
paddle::inference::Load(executor, scope, dirname + "/" + prog_filename,
dirname + "/" + param_filename);
} else {
// Parameters are saved in separate files sited in the specified
// `dirname`.
inference_program = paddle::inference::Load(executor, scope, dirname);
}
return inference_program;
}
std::vector<std::vector<int64_t>> GetFeedTargetShapes(
const std::string& dirname, const bool is_combined = false) {
auto place = paddle::platform::CPUPlace();
auto executor = paddle::framework::Executor(place);
auto* scope = new paddle::framework::Scope();
auto inference_program = InitProgram(&executor, scope, dirname, is_combined);
auto& global_block = inference_program->Block(0);
const std::vector<std::string>& feed_target_names =
inference_program->GetFeedTargetNames();
std::vector<std::vector<int64_t>> feed_target_shapes;
for (size_t i = 0; i < feed_target_names.size(); ++i) {
auto* var = global_block.FindVar(feed_target_names[i]);
std::vector<int64_t> var_shape = var->GetShape();
feed_target_shapes.push_back(var_shape);
}
delete scope;
return feed_target_shapes;
}
template <typename Place, bool CreateVars = true, bool PrepareContext = false> template <typename Place, bool CreateVars = true, bool PrepareContext = false>
void TestInference(const std::string& dirname, void TestInference(const std::string& dirname,
const std::vector<paddle::framework::LoDTensor*>& cpu_feeds, const std::vector<paddle::framework::LoDTensor*>& cpu_feeds,
...@@ -124,22 +168,7 @@ void TestInference(const std::string& dirname, ...@@ -124,22 +168,7 @@ void TestInference(const std::string& dirname,
paddle::platform::RecordEvent record_event( paddle::platform::RecordEvent record_event(
"init_program", "init_program",
paddle::platform::DeviceContextPool::Instance().Get(place)); paddle::platform::DeviceContextPool::Instance().Get(place));
inference_program = InitProgram(&executor, scope, dirname, is_combined);
if (is_combined) {
// All parameters are saved in a single file.
// Hard-coding the file names of program and parameters in unittest.
// The file names should be consistent with that used in Python API
// `fluid.io.save_inference_model`.
std::string prog_filename = "__model_combined__";
std::string param_filename = "__params_combined__";
inference_program = paddle::inference::Load(
&executor, scope, dirname + "/" + prog_filename,
dirname + "/" + param_filename);
} else {
// Parameters are saved in separate files sited in the specified
// `dirname`.
inference_program = paddle::inference::Load(&executor, scope, dirname);
}
} }
// Disable the profiler and print the timing information // Disable the profiler and print the timing information
paddle::platform::DisableProfiler(paddle::platform::EventSortingKey::kDefault, paddle::platform::DisableProfiler(paddle::platform::EventSortingKey::kDefault,
......
...@@ -186,6 +186,11 @@ endif() ...@@ -186,6 +186,11 @@ endif()
add_subdirectory(detail) add_subdirectory(detail)
if(WITH_DISTRIBUTE) if(WITH_DISTRIBUTE)
if(WITH_GPU)
op_library(gen_nccl_id_op DEPS nccl_common)
else()
set(DEPS_OPS ${DEPS_OPS} gen_nccl_id_op)
endif()
set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf)
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
op_library(send_op DEPS ${DISTRIBUTE_DEPS}) op_library(send_op DEPS ${DISTRIBUTE_DEPS})
...@@ -202,8 +207,9 @@ if(WITH_DISTRIBUTE) ...@@ -202,8 +207,9 @@ if(WITH_DISTRIBUTE)
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor) cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor)
cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS send_op listen_and_serv_op executor)
else() else()
set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op) set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op gen_nccl_id_op)
endif() endif()
op_library(cross_entropy_op DEPS cross_entropy) op_library(cross_entropy_op DEPS cross_entropy)
......
...@@ -52,7 +52,7 @@ bool RPCClient::AsyncSendVariable(const std::string& ep, ...@@ -52,7 +52,7 @@ bool RPCClient::AsyncSendVariable(const std::string& ep,
// stub context // stub context
SendProcessor* s = new SendProcessor(ch); SendProcessor* s = new SendProcessor(ch);
s->Prepare(var_h, time_out); s->Prepare(var_h, time_out);
s->response_call_back_ = NULL; s->response_call_back_ = nullptr;
auto call = s->stub_g_.PrepareUnaryCall( auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_); s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_);
......
...@@ -57,7 +57,9 @@ void ProcGetResponse(const VarHandle& var_h, const grpc::ByteBuffer& msg); ...@@ -57,7 +57,9 @@ void ProcGetResponse(const VarHandle& var_h, const grpc::ByteBuffer& msg);
class BaseProcessor { class BaseProcessor {
public: public:
explicit BaseProcessor(std::shared_ptr<grpc::Channel> ch) { context_ = NULL; } explicit BaseProcessor(std::shared_ptr<grpc::Channel> ch) {
context_ = nullptr;
}
virtual ~BaseProcessor() {} virtual ~BaseProcessor() {}
...@@ -105,7 +107,7 @@ class SendProcessor : public BaseProcessor { ...@@ -105,7 +107,7 @@ class SendProcessor : public BaseProcessor {
::grpc::GenericStub stub_g_; ::grpc::GenericStub stub_g_;
::grpc::ByteBuffer reply_; ::grpc::ByteBuffer reply_;
RequestSendCallBack response_call_back_ = NULL; RequestSendCallBack response_call_back_ = nullptr;
}; };
typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)> typedef std::function<void(const VarHandle&, const ::grpc::ByteBuffer&)>
......
...@@ -47,6 +47,7 @@ class AsyncGRPCServer final { ...@@ -47,6 +47,7 @@ class AsyncGRPCServer final {
explicit AsyncGRPCServer(const std::string &address, bool sync_mode) explicit AsyncGRPCServer(const std::string &address, bool sync_mode)
: address_(address), sync_mode_(sync_mode), ready_(0) {} : address_(address), sync_mode_(sync_mode), ready_(0) {}
~AsyncGRPCServer() {}
void WaitServerReady(); void WaitServerReady();
void RunSyncUpdate(); void RunSyncUpdate();
......
...@@ -32,6 +32,7 @@ service SendRecvService { ...@@ -32,6 +32,7 @@ service SendRecvService {
enum VarType { enum VarType {
LOD_TENSOR = 0; LOD_TENSOR = 0;
SELECTED_ROWS = 1; SELECTED_ROWS = 1;
NCCL_ID = 2;
} }
// NOTICE(gongwb):don't modify this proto if you are not // NOTICE(gongwb):don't modify this proto if you are not
......
...@@ -14,6 +14,9 @@ limitations under the License. */ ...@@ -14,6 +14,9 @@ limitations under the License. */
#include "paddle/fluid/operators/detail/sendrecvop_utils.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h"
#ifdef PADDLE_WITH_CUDA
#include <nccl.h>
#endif
#include <sys/time.h> #include <sys/time.h>
#include <thread> // NOLINT #include <thread> // NOLINT
...@@ -129,6 +132,10 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -129,6 +132,10 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
} else if (var->IsType<framework::SelectedRows>()) { } else if (var->IsType<framework::SelectedRows>()) {
request.set_type(::sendrecv::SELECTED_ROWS); request.set_type(::sendrecv::SELECTED_ROWS);
GetSelectedRowsPayload(var, ctx, &request, &payload, &payload_size); GetSelectedRowsPayload(var, ctx, &request, &payload, &payload_size);
#ifdef PADDLE_WITH_CUDA
} else if (var->IsType<ncclUniqueId>()) {
request.set_type(::sendrecv::NCCL_ID);
#endif
} else { } else {
PADDLE_THROW("Serialize does not support type: %s", PADDLE_THROW("Serialize does not support type: %s",
typeid(var->Type()).name()); typeid(var->Type()).name());
...@@ -149,6 +156,24 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -149,6 +156,24 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
void* buf = buffer.get(); void* buf = buffer.get();
ProtoEncodeHelper e(static_cast<char*>(buf), 1024); ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
e.WriteRawBytes(std::string(header.data(), header.size())); e.WriteRawBytes(std::string(header.data(), header.size()));
// NCCLID is copied directly to the message, return bytebuffer
// with only one slice if serializing NCCLID.
#ifdef PADDLE_WITH_CUDA
if (var->IsType<ncclUniqueId>()) {
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber,
NCCL_UNIQUE_ID_BYTES);
const ncclUniqueId& uid = var->Get<ncclUniqueId>();
e.WriteRawBytes(std::string(uid.internal, NCCL_UNIQUE_ID_BYTES));
// for serialize NCCL_ID
::grpc::Slice slices(e.size());
memcpy(const_cast<uint8_t*>(slices.begin()), e.data(), e.size());
::grpc::ByteBuffer tmp(&slices, 1);
msg->Swap(&tmp);
return;
}
#endif
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size);
// steal reference of tensor data // steal reference of tensor data
::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows ::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows
......
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#ifdef PADDLE_WITH_CUDA
#include <nccl.h>
#endif
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h"
...@@ -368,7 +371,8 @@ int VariableResponse::Parse(Source* source) { ...@@ -368,7 +371,8 @@ int VariableResponse::Parse(Source* source) {
} }
case sendrecv::VariableMessage::kSerializedFieldNumber: { case sendrecv::VariableMessage::kSerializedFieldNumber: {
PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS ||
meta_.type() == sendrecv::LOD_TENSOR) && meta_.type() == sendrecv::LOD_TENSOR ||
meta_.type() == sendrecv::NCCL_ID) &&
meta_.varname() != "", meta_.varname() != "",
"meta info should be got first!"); "meta info should be got first!");
...@@ -378,6 +382,22 @@ int VariableResponse::Parse(Source* source) { ...@@ -378,6 +382,22 @@ int VariableResponse::Parse(Source* source) {
return tag; return tag;
} }
if (meta_.type() == sendrecv::NCCL_ID) {
#ifdef PADDLE_WITH_CUDA
auto* var = scope_->FindVar(meta_.varname());
if (var != nullptr) {
ncclUniqueId* id = var->GetMutable<ncclUniqueId>();
if (!ReadRaw(&input, *dev_ctx_, platform::CPUPlace(), id->internal,
num_bytes)) {
return tag;
}
}
break;
#else
PADDLE_THROW("Not compiled with CUDA!");
#endif
}
framework::DDim dims = GetDims(meta_.dims()); framework::DDim dims = GetDims(meta_.dims());
if (meta_.type() == sendrecv::LOD_TENSOR) { if (meta_.type() == sendrecv::LOD_TENSOR) {
PADDLE_ENFORCE(meta_.lod_size() >= 0, PADDLE_ENFORCE(meta_.lod_size() >= 0,
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <nccl.h>
#include <stdint.h>
#include <ostream>
#include <string>
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/threadpool.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/detail/grpc_server.h"
#include "paddle/fluid/platform/nccl_helper.h"
namespace paddle {
namespace operators {
class GenNCCLIdOp : public framework::OperatorBase {
public:
GenNCCLIdOp(const std::string& type, const framework::VariableNameMap& inputs,
const framework::VariableNameMap& outputs,
const framework::AttributeMap& attrs)
: OperatorBase(type, inputs, outputs, attrs) {}
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
// put nccl id in CPUPlace
auto& dev_ctx = *pool.Get(platform::CPUPlace());
int trainer_id = Attr<int>("trainer_id");
framework::Scope& local_scope = scope.NewScope();
if (trainer_id == 0) {
GenerateAndSend(&local_scope, dev_ctx);
} else {
GetIdByServer(&local_scope, dev_ctx);
}
}
private:
void GenerateAndSend(framework::Scope* scope,
const platform::DeviceContext& dev_ctx) const {
auto var = scope->FindVar(NCCL_ID_VARNAME);
PADDLE_ENFORCE_NOT_NULL(var);
auto id = var->GetMutable<ncclUniqueId>();
PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id));
std::vector<std::string> endpoint_list =
Attr<std::vector<std::string>>("endpoint_list");
detail::RPCClient client;
for (auto& ep : endpoint_list) {
VLOG(3) << "sending nccl id to " << ep;
client.AsyncSendVariable(ep, dev_ctx, *scope, NCCL_ID_VARNAME);
}
client.Wait();
VLOG(3) << "sending completed...";
}
void GetIdByServer(framework::Scope* scope,
const platform::DeviceContext& dev_ctx) const {
std::string endpoint = Attr<std::string>("endpoint");
// NOTE: Can not use unique_ptr here because the default
// deleter will call GRPC Server's base class's dtor and
// that will cause a wired crash.
detail::AsyncGRPCServer rpc_service(endpoint, true);
framework::ProgramDesc empty_program;
framework::Executor executor(dev_ctx.GetPlace());
rpc_service.SetScope(scope);
rpc_service.SetDevCtx(&dev_ctx);
rpc_service.SetProgram(&empty_program);
rpc_service.SetExecutor(&executor);
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, &rpc_service));
rpc_service.SetCond(0);
VLOG(3) << "start getting nccl id from trainer 0...";
auto recv = rpc_service.Get();
VLOG(3) << "got nccl id and stop server...";
rpc_service.ShutDown();
VLOG(3) << "rpc server stopped";
server_thread.join();
}
};
class GenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddOutput("NCCLID", "Raw variable contains a NCCL UniqueId instaces.");
AddComment(R"DOC(
GenNCCLId operator
For trainer 0: generate a new UniqueId and send it to all the other trainers.
For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the server.
)DOC");
AddAttr<std::string>("endpoint",
"(string), e.g. 127.0.0.1:6175 "
"current listen endpoint");
AddAttr<std::vector<std::string>>(
"endpoint_list",
"['trainer1_ip:port', 'trainer2_ip:port', ...] "
"list of trainer endpoints start from trainer 1")
.SetDefault({});
AddAttr<int>("trainer_id",
"(int default 0) "
"The index of the trainer in distributed training.")
.SetDefault(0);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(gen_nccl_id, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker);
...@@ -64,18 +64,22 @@ class LoDTensor2BatchFunctor { ...@@ -64,18 +64,22 @@ class LoDTensor2BatchFunctor {
bool is_reverse = false) const { bool is_reverse = false) const {
if (!is_cal_batch_lod) { if (!is_cal_batch_lod) {
auto lods = batch->lod(); auto lods = batch->lod();
PADDLE_ENFORCE_GT(lods.size(), 2UL); PADDLE_ENFORCE_GT(lods.size(), 2UL,
PADDLE_ENFORCE_EQ(lods[1].size(), "The LoD of LoDTensor should inlcude at least 2-level "
static_cast<size_t>(lod_tensor.dims()[0])); "sequence information.");
PADDLE_ENFORCE_EQ(
lods[1].size(), static_cast<size_t>(lod_tensor.dims()[0]),
"The LoD information should be consistent with the dims.");
CopyMatrixRowsFunctor<DeviceContext, T> to_batch; CopyMatrixRowsFunctor<DeviceContext, T> to_batch;
to_batch(context, lod_tensor, lods[1], batch, true); to_batch(context, lod_tensor, lods[1], batch, true);
return; return;
} }
auto lods = lod_tensor.lod(); auto lods = lod_tensor.lod();
auto lod = lods[0];
PADDLE_ENFORCE_EQ(lods.size(), 1UL, "Only support one level sequence now."); PADDLE_ENFORCE_EQ(lods.size(), 1UL, "Only support one level sequence now.");
auto lod = lods[0];
std::vector<SeqInfo> seq_info; std::vector<SeqInfo> seq_info;
for (size_t seq_id = 0; seq_id < lod.size() - 1; ++seq_id) { for (size_t seq_id = 0; seq_id < lod.size() - 1; ++seq_id) {
int length = lod[seq_id + 1] - lod[seq_id]; int length = lod[seq_id + 1] - lod[seq_id];
...@@ -157,9 +161,12 @@ class Batch2LoDTensorFunctor { ...@@ -157,9 +161,12 @@ class Batch2LoDTensorFunctor {
const framework::LoDTensor& batch, const framework::LoDTensor& batch,
framework::LoDTensor* lod_tensor) const { framework::LoDTensor* lod_tensor) const {
auto in_lod = batch.lod(); auto in_lod = batch.lod();
PADDLE_ENFORCE_GT(in_lod.size(), 2UL); PADDLE_ENFORCE_GT(in_lod.size(), 2UL,
PADDLE_ENFORCE_EQ(in_lod[1].size(), "The LoD of LoDTensor should inlcude at least 2-level "
static_cast<size_t>(lod_tensor->dims()[0])); "sequence information.");
PADDLE_ENFORCE_EQ(
in_lod[1].size(), static_cast<size_t>(lod_tensor->dims()[0]),
"The LoD information should be consistent with the dims.");
CopyMatrixRowsFunctor<DeviceContext, T> to_seq; CopyMatrixRowsFunctor<DeviceContext, T> to_seq;
to_seq(context, batch, in_lod[1], lod_tensor, false); to_seq(context, batch, in_lod[1], lod_tensor, false);
} }
......
...@@ -92,14 +92,16 @@ class ReshapeOp : public framework::OperatorWithKernel { ...@@ -92,14 +92,16 @@ class ReshapeOp : public framework::OperatorWithKernel {
} }
if (unk_dim_idx != -1) { if (unk_dim_idx != -1) {
output_shape[unk_dim_idx] = -in_size / capacity;
// in_size < 0 and is un-determinate in compile time, skip the check,
// for example, in_dims = [-1, 8, 1, 1], shape = [-1, 3, 8],
// capacity = -24, in_size = -8, output_shape[0] = 0
// the following check will fail.
if (in_size > 0) { if (in_size > 0) {
// in_size < 0 and is un-determinate in compile time, skip the check,
// for example, in_dims = [-1, 8, 1, 1], shape = [-1, 3, 8],
// capacity = -24, in_size = -8, output_shape[0] = 0
// the following check will fail.
output_shape[unk_dim_idx] = -in_size / capacity;
PADDLE_ENFORCE_EQ(output_shape[unk_dim_idx] * capacity, -in_size, PADDLE_ENFORCE_EQ(output_shape[unk_dim_idx] * capacity, -in_size,
"Invalid shape is given."); "Invalid shape is given.");
} else {
output_shape[unk_dim_idx] = -1;
} }
} else { } else {
PADDLE_ENFORCE_EQ(capacity, in_size, "Invalid shape is given."); PADDLE_ENFORCE_EQ(capacity, in_size, "Invalid shape is given.");
...@@ -122,7 +124,10 @@ class ReshapeKernel : public framework::OpKernel<T> { ...@@ -122,7 +124,10 @@ class ReshapeKernel : public framework::OpKernel<T> {
void Compute(const framework::ExecutionContext &ctx) const { void Compute(const framework::ExecutionContext &ctx) const {
auto *out = ctx.Output<framework::LoDTensor>("Out"); auto *out = ctx.Output<framework::LoDTensor>("Out");
auto *in = ctx.Input<framework::LoDTensor>("X"); auto *in = ctx.Input<framework::LoDTensor>("X");
auto *shape_tensor = ctx.Input<framework::LoDTensor>("Shape");
auto *shape_tensor = ctx.HasInput("Shape")
? ctx.Input<framework::LoDTensor>("Shape")
: nullptr;
framework::DDim out_dims = out->dims(); framework::DDim out_dims = out->dims();
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <unistd.h>
#include <string>
#include <thread> // NOLINT
#include "gtest/gtest.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
#include "paddle/fluid/platform/nccl_helper.h"
#include "paddle/fluid/string/printf.h"
USE_NO_KERNEL_OP(listen_and_serv);
namespace f = paddle::framework;
namespace p = paddle::platform;
namespace m = paddle::operators::math;
namespace detail = paddle::operators::detail;
namespace string = paddle::string;
std::unique_ptr<detail::AsyncGRPCServer> rpc_service;
void StartServer(std::atomic<bool>* initialized) {
f::Scope scope;
p::CPUPlace place;
scope.Var(NCCL_ID_VARNAME);
p::DeviceContextPool& pool = p::DeviceContextPool::Instance();
auto& dev_ctx = *pool.Get(p::CPUPlace());
rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", true));
f::ProgramDesc empty_program;
f::Executor executor(dev_ctx.GetPlace());
rpc_service->SetScope(&scope);
rpc_service->SetDevCtx(&dev_ctx);
rpc_service->SetProgram(&empty_program);
rpc_service->SetExecutor(&executor);
std::thread server_thread(
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service.get()));
*initialized = true;
rpc_service->SetCond(0);
auto recv = rpc_service->Get();
LOG(INFO) << "got nccl id and stop server...";
rpc_service->ShutDown();
server_thread.join();
}
TEST(SendNcclId, Normal) {
std::atomic<bool> initialized{false};
std::thread server_thread(StartServer, &initialized);
while (!initialized) {
}
// wait server to start
// sleep(2);
rpc_service->WaitServerReady();
f::Scope scope;
p::CPUPlace place;
p::DeviceContextPool& pool = p::DeviceContextPool::Instance();
auto& dev_ctx = *pool.Get(p::CPUPlace());
auto var = scope.Var(NCCL_ID_VARNAME);
// var->SetType(f::proto::VarType_Type_RAW);
auto id = var->GetMutable<ncclUniqueId>();
p::dynload::ncclGetUniqueId(id);
int port = rpc_service->GetSelectedPort();
std::string ep = string::Sprintf("127.0.0.1:%d", port);
detail::RPCClient client;
client.AsyncSendVariable(ep, dev_ctx, scope, NCCL_ID_VARNAME);
client.Wait();
server_thread.join();
auto* ptr = rpc_service.release();
delete ptr;
}
...@@ -14,12 +14,15 @@ ...@@ -14,12 +14,15 @@
#pragma once #pragma once
#include <stdio.h>
#include <thread> // NOLINT #include <thread> // NOLINT
#include <typeindex> #include <typeindex>
#include <vector> #include <vector>
#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#define NCCL_ID_VARNAME "NCCLID"
namespace paddle { namespace paddle {
namespace platform { namespace platform {
...@@ -73,7 +76,9 @@ struct NCCLContextMap { ...@@ -73,7 +76,9 @@ struct NCCLContextMap {
std::unordered_map<int, NCCLContext> contexts_; std::unordered_map<int, NCCLContext> contexts_;
std::vector<int> order_; std::vector<int> order_;
explicit NCCLContextMap(const std::vector<platform::Place> &places) { explicit NCCLContextMap(const std::vector<platform::Place> &places,
ncclUniqueId *nccl_id = nullptr,
size_t num_trainers = 1, size_t trainer_id = 0) {
PADDLE_ENFORCE(!places.empty()); PADDLE_ENFORCE(!places.empty());
order_.reserve(places.size()); order_.reserve(places.size());
for (auto &p : places) { for (auto &p : places) {
...@@ -85,18 +90,34 @@ struct NCCLContextMap { ...@@ -85,18 +90,34 @@ struct NCCLContextMap {
order_.size(), contexts_.size(), order_.size(), contexts_.size(),
"NCCL Context Map does not support contain two or more same device"); "NCCL Context Map does not support contain two or more same device");
if (places.size() > 1) { if (places.size() <= 1) {
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]); return;
}
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
// if pass nccl_id here, can assume we are doing multi node training
if (nccl_id == nullptr) {
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
comms.get(), static_cast<int>(order_.size()), order_.data()));
} else {
PADDLE_ENFORCE_GT(num_trainers, 1);
// TODO(wuyi): need to ensure each node have same number of GPUs
{ {
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex()); int nranks = num_trainers * order_.size();
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( NCCLGroupGuard gurad;
comms.get(), static_cast<int>(order_.size()), order_.data())); for (auto &gpu_id : order_) {
} int rank = trainer_id * order_.size() + gpu_id;
int i = 0; VLOG(3) << "init nccl rank: " << rank << " nranks: " << nranks;
for (auto &dev_id : order_) { PADDLE_ENFORCE(cudaSetDevice(gpu_id));
contexts_.at(dev_id).comm_ = comms[i++]; PADDLE_ENFORCE(platform::dynload::ncclCommInitRank(
comms.get() + gpu_id, nranks, *nccl_id, rank));
}
} }
} }
int i = 0;
for (auto &dev_id : order_) {
contexts_.at(dev_id).comm_ = comms[i++];
}
} }
NCCLContextMap(const NCCLContextMap &other) = delete; NCCLContextMap(const NCCLContextMap &other) = delete;
......
...@@ -503,12 +503,13 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -503,12 +503,13 @@ All parameter, weight, gradient are variables in Paddle.
const ProgramDesc &main_program, const std::string &loss_var_name, const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, std::vector<Scope *> &local_scopes, Scope *scope, std::vector<Scope *> &local_scopes,
bool allow_op_delay, bool use_default_grad_scale, bool allow_op_delay, bool use_default_grad_scale,
bool balance_parameter_opt_between_cards) { bool balance_parameter_opt_between_cards, size_t num_trainers,
size_t trainer_id) {
new (&self) ParallelExecutor( new (&self) ParallelExecutor(
num_threads, use_event, places, params, bcast_vars, num_threads, use_event, places, params, bcast_vars,
main_program, loss_var_name, scope, local_scopes, main_program, loss_var_name, scope, local_scopes,
allow_op_delay, use_default_grad_scale, allow_op_delay, use_default_grad_scale,
balance_parameter_opt_between_cards); balance_parameter_opt_between_cards, num_trainers, trainer_id);
}) })
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy. // NOTE: even we return a vec<Scope*>* to Python use reference policy.
......
...@@ -489,7 +489,7 @@ class Operator(object): ...@@ -489,7 +489,7 @@ class Operator(object):
'rnn_memory_helper_grad', 'conditional_block', 'while', 'send', 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send',
'recv', 'listen_and_serv', 'parallel_do', 'save_combine', 'recv', 'listen_and_serv', 'parallel_do', 'save_combine',
'load_combine', 'ncclInit', 'channel_create', 'channel_close', 'load_combine', 'ncclInit', 'channel_create', 'channel_close',
'channel_send', 'channel_recv', 'select' 'channel_send', 'channel_recv', 'select', 'gen_nccl_id'
} }
if type not in no_kernel_op_set: if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc) self.desc.infer_var_type(self.block.desc)
......
...@@ -31,7 +31,9 @@ class ParallelExecutor(object): ...@@ -31,7 +31,9 @@ class ParallelExecutor(object):
allow_op_delay=False, allow_op_delay=False,
share_vars_from=None, share_vars_from=None,
use_default_grad_scale=True, use_default_grad_scale=True,
balance_parameter_opt_between_cards=False): balance_parameter_opt_between_cards=False,
num_trainers=1,
trainer_id=0):
""" """
ParallelExecutor can run program in parallel. ParallelExecutor can run program in parallel.
...@@ -55,6 +57,11 @@ class ParallelExecutor(object): ...@@ -55,6 +57,11 @@ class ParallelExecutor(object):
balance_parameter_opt_between_cards(bool, default True): Whether balance_parameter_opt_between_cards(bool, default True): Whether
updating different gradients on different cards. Currently, it updating different gradients on different cards. Currently, it
is not recommended. is not recommended.
num_trainers(int, default 1): If greater than 1, NCCL will be
initialized with multpile rank of nodes, each node should have
same number of GPUs. Distributed training will be enabled then.
trainer_id(int, default 0): Must use together with num_trainers.
trainer_id is the "rank" of current node starts from 0.
Returns: Returns:
A ParallelExecutor object. A ParallelExecutor object.
...@@ -134,8 +141,9 @@ class ParallelExecutor(object): ...@@ -134,8 +141,9 @@ class ParallelExecutor(object):
local_scopes, local_scopes,
allow_op_delay, allow_op_delay,
use_default_grad_scale, use_default_grad_scale,
balance_parameter_opt_between_cards) balance_parameter_opt_between_cards,
num_trainers,
trainer_id)
self.scope = scope self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None): def run(self, fetch_list, feed=None, feed_dict=None):
......
...@@ -6,4 +6,5 @@ foreach(src ${TEST_OPS}) ...@@ -6,4 +6,5 @@ foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py) py_test(${src} SRCS ${src}.py)
endforeach() endforeach()
add_subdirectory(fit_a_line)
add_subdirectory(recognize_digits) add_subdirectory(recognize_digits)
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
# default test
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid as fluid
import contextlib
import numpy
import unittest
# train reader
BATCH_SIZE = 20
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.train(), buf_size=500),
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.uci_housing.test(), buf_size=500),
batch_size=BATCH_SIZE)
def inference_program():
x = fluid.layers.data(name='x', shape=[13], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None)
return y_predict
def linear():
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
y_predict = inference_program()
loss = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_loss = fluid.layers.mean(loss)
return avg_loss
def train(use_cuda, save_dirname):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
trainer = fluid.Trainer(
train_func=linear,
infer_func=inference_program,
place=place,
optimizer=fluid.optimizer.SGD(learning_rate=0.001))
def event_handler(event):
if isinstance(event, fluid.EndEpochEvent):
test_metrics = trainer.test(
reader=test_reader, feed_order=['x', 'y'])
print test_metrics
'''
...
['25.768919467926025']
['15.343549569447836']
...
'''
if float(test_metrics[0]) < 20.0:
if save_dirname is not None:
# NOT clear yet
# fluid.io.save_inference_model(save_dirname, ['x'], [y_predict])
# trainer.save_params(save_dirname)
# https://github.com/PaddlePaddle/Paddle/pull/10445
trainer.save_inference_model(save_dirname)
return
trainer.train(
reader=train_reader,
num_epochs=100,
event_handler=event_handler,
feed_order=['x', 'y'])
# infer
def infer(use_cuda, save_dirname=None):
if save_dirname is None:
return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
inferencer = fluid.Inferencer(param_path=save_dirname, place=place)
batch_size = 10
tensor_x = numpy.random.uniform(0, 10, [batch_size, 13]).astype("float32")
results = inferencer.infer({'x': tensor_x})
print("infer results: ", results[0])
def main(use_cuda):
if use_cuda and not fluid.core.is_compiled_with_cuda():
return
# Directory for saving the trained model
save_dirname = "fit_a_line.inference.model"
train(use_cuda, save_dirname)
infer(use_cuda, save_dirname)
class TestFitALine(unittest.TestCase):
def test_cpu(self):
with self.program_scope_guard():
with fluid.unique_name.guard():
main(use_cuda=False)
def test_cuda(self):
with self.program_scope_guard():
with fluid.unique_name.guard():
main(use_cuda=True)
@contextlib.contextmanager
def program_scope_guard(self):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
yield
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册