From 9ad940fdfe865be0a9871ffee8b0f682044e67fc Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 12 Dec 2019 13:08:06 +0800 Subject: [PATCH] memory leak for cpu (#21174) * add fake init for the trainer, fix large memory hold in the trainer * do not merge recv vars from a remote endpoint, test=develop * add recv and save op, merge slice var in one op, save memory * remove hsigmoid with pull sparse, test=develop --- paddle/fluid/framework/lod_tensor.cc | 39 ++- paddle/fluid/framework/lod_tensor.h | 4 + paddle/fluid/framework/save_load_util.cc | 5 +- paddle/fluid/framework/tensor_util.cc | 76 ++++- paddle/fluid/framework/tensor_util.h | 3 + .../distributed/parameter_prefetch.cc | 25 +- .../distributed_lookup_table_op.cc | 9 +- .../operators/distributed_ops/fake_init_op.cc | 5 - .../operators/distributed_ops/recv_save_op.cc | 262 +++++++++++++++++ .../fluid/operators/hierarchical_sigmoid_op.h | 40 --- paddle/fluid/operators/load_op.cc | 8 +- paddle/fluid/operators/load_op.h | 14 +- paddle/fluid/string/to_string.h | 1 + python/paddle/fluid/io.py | 157 ++++------ .../fluid/tests/unittests/CMakeLists.txt | 6 +- .../tests/unittests/test_dist_transpiler.py | 112 +++++++- .../test_hsigmoid_remote_table_op.py | 271 ------------------ .../tests/unittests/test_recv_save_op.py | 265 +++++++++++++++++ .../tests/unittests/test_transpiler_ops.py | 143 +++++++++ .../fluid/transpiler/distribute_transpiler.py | 110 +++++-- 20 files changed, 1081 insertions(+), 474 deletions(-) create mode 100644 paddle/fluid/operators/distributed_ops/recv_save_op.cc delete mode 100644 python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py create mode 100644 python/paddle/fluid/tests/unittests/test_recv_save_op.py create mode 100644 python/paddle/fluid/tests/unittests/test_transpiler_ops.py diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index ca820068c4..ec5aa27be8 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -243,15 +243,48 @@ void SerializeToStream(std::ostream &os, const LoDTensor &tensor, TensorToStream(os, static_cast(tensor), dev_ctx); } +void DeserializeFromStream(std::istream &is, LoDTensor *tensor, + const platform::DeviceContext &dev_ctx, + const size_t &seek, + const std::vector &shape) { + { + // the 1st field, unit32_t version for LoDTensor + uint32_t version; + is.read(reinterpret_cast(&version), sizeof(version)); + PADDLE_ENFORCE_EQ(framework::IsTensorVersionSupported(version), true, + platform::errors::InvalidArgument( + "tensor version %u is not supported.", version)); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); + } + { + // the 2st field, LoD information + uint64_t lod_level; + is.read(reinterpret_cast(&lod_level), sizeof(lod_level)); + auto &lod = *tensor->mutable_lod(); + lod.resize(lod_level); + } + // the 3st filed, Tensor + TensorFromStream(is, static_cast(tensor), dev_ctx, seek, shape); +} + void DeserializeFromStream(std::istream &is, LoDTensor *tensor, const platform::DeviceContext &dev_ctx) { { // the 1st field, unit32_t version for LoDTensor uint32_t version; is.read(reinterpret_cast(&version), sizeof(version)); - PADDLE_ENFORCE(framework::IsTensorVersionSupported(version), - "tensor version %u is not supported.", version); - PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported"); + PADDLE_ENFORCE_EQ(framework::IsTensorVersionSupported(version), true, + platform::errors::InvalidArgument( + "tensor version %u is not supported.", version)); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); } { // the 2st field, LoD information diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index ef48753349..3ad873d1f6 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -209,6 +209,10 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, const platform::DeviceContext& dev_ctx); void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); +void DeserializeFromStream(std::istream& is, LoDTensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, + const std::vector& shape); /* * Convert between length-based LoD and offset-based LoD. diff --git a/paddle/fluid/framework/save_load_util.cc b/paddle/fluid/framework/save_load_util.cc index 76544af052..fbbbfd66b3 100644 --- a/paddle/fluid/framework/save_load_util.cc +++ b/paddle/fluid/framework/save_load_util.cc @@ -342,8 +342,9 @@ bool LoadTensorFromDisk( std::unique_ptr buf(new char[size]); fin.read(reinterpret_cast(buf.get()), size); CheckInStreamState(fin, sizeof(size)); - PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size), - "Cannot parse tensor desc"); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); } { // read tensor diff --git a/paddle/fluid/framework/tensor_util.cc b/paddle/fluid/framework/tensor_util.cc index ace6b033cc..8b2e3fc323 100644 --- a/paddle/fluid/framework/tensor_util.cc +++ b/paddle/fluid/framework/tensor_util.cc @@ -404,8 +404,9 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, uint64_t size = tensor.numel() * framework::SizeOfType(tensor.type()); auto* data_ptr = tensor.data(); - PADDLE_ENFORCE(size < std::numeric_limits::max(), - "Index overflow when writing tensor"); + PADDLE_ENFORCE_LT(size, std::numeric_limits::max(), + platform::errors::ResourceExhausted( + "tensor size %d overflow when writing tensor", size)); if (platform::is_gpu_place(tensor.place())) { #ifdef PADDLE_WITH_CUDA constexpr size_t kBufSize = 1024 * 1024 * 64; // 64MB @@ -426,7 +427,8 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, size -= size_to_write; } #else - PADDLE_THROW("Unexpected branch"); + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); #endif } else { os.write(static_cast(data_ptr), @@ -450,11 +452,69 @@ struct DeserializedDataFunctor { platform::Place place_; }; +void TensorFromStream(std::istream& is, Tensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, const std::vector& shape) { + uint32_t version; + is.read(reinterpret_cast(&version), sizeof(version)); + + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); + + proto::VarType::TensorDesc desc; + { // int32_t size + // proto buffer + int32_t size; + is.read(reinterpret_cast(&size), sizeof(size)); + std::unique_ptr buf(new char[size]); + is.read(reinterpret_cast(buf.get()), size); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); + } + { // read tensor + tensor->Resize(framework::make_ddim(shape)); + size_t seekg = seek * framework::SizeOfType(desc.data_type()); + is.seekg(seekg, is.cur); + + void* buf; + auto ctx = platform::CPUDeviceContext(); + size_t size = tensor->numel() * framework::SizeOfType(desc.data_type()); + if (platform::is_gpu_place(dev_ctx.GetPlace())) { +#ifdef PADDLE_WITH_CUDA + Tensor cpu_tensor; + cpu_tensor.Resize(framework::make_ddim(shape)); + framework::VisitDataType( + desc.data_type(), + DeserializedDataFunctor(&buf, &cpu_tensor, ctx.GetPlace())); + is.read(static_cast(buf), size); + auto dst_place = dev_ctx.GetPlace(); + framework::TensorCopy(cpu_tensor, dst_place, dev_ctx, tensor); +#else + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); +#endif + } else { + framework::VisitDataType( + desc.data_type(), + DeserializedDataFunctor(&buf, tensor, ctx.GetPlace())); + is.read(static_cast(buf), size); + } + } +} + void TensorFromStream(std::istream& is, Tensor* tensor, const platform::DeviceContext& dev_ctx) { uint32_t version; is.read(reinterpret_cast(&version), sizeof(version)); - PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported"); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); proto::VarType::TensorDesc desc; { // int32_t size // proto buffer @@ -462,8 +522,9 @@ void TensorFromStream(std::istream& is, Tensor* tensor, is.read(reinterpret_cast(&size), sizeof(size)); std::unique_ptr buf(new char[size]); is.read(reinterpret_cast(buf.get()), size); - PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size), - "Cannot parse tensor desc"); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); } { // read tensor std::vector dims; @@ -484,7 +545,8 @@ void TensorFromStream(std::istream& is, Tensor* tensor, auto dst_place = dev_ctx.GetPlace(); framework::TensorCopy(cpu_tensor, dst_place, dev_ctx, tensor); #else - PADDLE_THROW("Unexpected branch"); + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); #endif } else { framework::VisitDataType( diff --git a/paddle/fluid/framework/tensor_util.h b/paddle/fluid/framework/tensor_util.h index dd535dfb6b..9cbcf8f628 100644 --- a/paddle/fluid/framework/tensor_util.h +++ b/paddle/fluid/framework/tensor_util.h @@ -72,6 +72,9 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, const platform::DeviceContext& dev_ctx); void TensorFromStream(std::istream& is, Tensor* tensor, const platform::DeviceContext& dev_ctx); +void TensorFromStream(std::istream& is, Tensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, const std::vector& shape); // convert dlpack's DLTensor to tensor void TensorFromDLPack(const ::DLTensor& dl_tensor, framework::Tensor* dst); diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index b6ee2d29df..c9bbc3c193 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -183,9 +183,18 @@ void prefetchs(const std::vector& id_var_names, PADDLE_ENFORCE_EQ(table_names.size(), endpoints.size(), ""); PADDLE_ENFORCE_EQ(table_names.size(), height_sections.size(), ""); - auto* reconstruct_var = - scope.FindVar(persistable_var_name)->GetMutable(); - const auto vec_dim_1 = reconstruct_var->dims()[1]; + auto vec_dim_1 = 0; + framework::Variable* var = scope.FindVar(persistable_var_name); + + PADDLE_ENFORCE_EQ(var->IsType(), true, + platform::errors::InvalidArgument( + "prefetch can only support LodTensor only")); + + vec_dim_1 = var->Get().dims()[1]; + + PADDLE_ENFORCE_GT(vec_dim_1, 0, + platform::errors::InvalidArgument( + "lookup table var's dim must gather than 0")); const auto place = scope.FindVar(id_var_names[0])->Get().place(); @@ -251,16 +260,6 @@ void prefetchs(const std::vector& id_var_names, } } } - - if (backfill) { - VLOG(3) << "backfill persistable var's id with vecs"; - - auto* reconstruct_d = reconstruct_var->data(); - for (auto& id : ids_union) { - std::copy(recved_vec_map[id].begin(), recved_vec_map[id].end(), - reconstruct_d + id * vec_dim_1); - } - } } }; // namespace distributed diff --git a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc index 403347d334..f37d2b1eee 100644 --- a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc +++ b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc @@ -72,8 +72,9 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel { protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext &ctx) const override { - auto data_type = OperatorWithKernel::IndicateVarDataType(ctx, "W"); - return framework::OpKernelType(data_type, ctx.device_context()); + return framework::OpKernelType( + framework::proto::VarType::Type(ctx.Attr("dtype")), + ctx.GetPlace()); } }; @@ -139,6 +140,10 @@ class DistributedLookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "Otherwise the given value indicates padding the output " "with zeros whenever lookup encounters it in Ids.") .SetDefault(distributed::kNoPadding); + AddAttr("dtype", + "(int, default 5 (FP32)) " + "Output data type") + .SetDefault(framework::proto::VarType::FP32); AddComment(R"DOC( Lookup Tablel Prefetch Operator. diff --git a/paddle/fluid/operators/distributed_ops/fake_init_op.cc b/paddle/fluid/operators/distributed_ops/fake_init_op.cc index b56ab1d5fd..f5a34b2dcb 100644 --- a/paddle/fluid/operators/distributed_ops/fake_init_op.cc +++ b/paddle/fluid/operators/distributed_ops/fake_init_op.cc @@ -1,11 +1,8 @@ /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,10 +66,8 @@ class FakeInitOpMaker : public framework::OpProtoAndCheckerMaker { "with the specified value"); AddComment(R"DOC( FakeInit Operator. - Init an variable but not alloc memory for it, it is used for init the table parameter at trainer side in distributed lookup table. - )DOC"); } }; diff --git a/paddle/fluid/operators/distributed_ops/recv_save_op.cc b/paddle/fluid/operators/distributed_ops/recv_save_op.cc new file mode 100644 index 0000000000..766e378e10 --- /dev/null +++ b/paddle/fluid/operators/distributed_ops/recv_save_op.cc @@ -0,0 +1,262 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/data_type_transform.h" +#include "paddle/fluid/framework/framework.pb.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/framework/version.h" +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/parameter_recv.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace operators { +class RecvSaveOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override {} + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + return framework::OpKernelType( + framework::proto::VarType::Type(ctx.Attr("dtype")), + ctx.GetPlace()); + } +}; + +class RecvSaveOpProtoMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddComment(R"DOC( +Recv Save operator + +This operator will serialize and write LoDTensor variable to file on disk. +)DOC"); + AddAttr("dtype", + "(int, default 5 (FP32)) " + "Output data type") + .SetDefault(framework::proto::VarType::FP32); + + AddAttr("overwrite", + "(boolean, default true)" + "Overwrite the output file if exist") + .SetDefault(true); + + AddAttr("file_path", + "(string)" + "The \"file_path\" where the variable will be saved.") + .AddCustomChecker( + [](const std::string &path) { return !path.empty(); }); + + AddAttr>("shape", + "(vector) The shape of the output") + .SetDefault({}); + + AddAttr>( + "slice_varnames", + "(string vector, default {}) " + "sometimes we need to put received var in another name " + "for example: we need var named 'moment_1@127.0.0.1:1001', " + "and it real name on parameter server is 'moment_1'. ") + .SetDefault({}); + + AddAttr>( + "remote_varnames", + "(string vector, default {}) " + "sometimes we need to put received var in another name " + "for example: we need var named 'moment_1@127.0.0.1:1001', " + "and it real name on parameter server is 'moment_1'. ") + .SetDefault({}); + + AddAttr>("slice_shapes", + "(vector) " + "the length of each output along the " + "specified axis.") + .SetDefault({}); + + AddAttr>("endpoints", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input " + "variables for mapping") + .SetDefault({}); + + AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + } +}; + +template +class RecvSaveOpKernel : public framework::OpKernel { + private: + void SerializeVersionToStream(std::ostream &os) const { + { // the 1st field, uint32_t version for LoDTensor + os.write(reinterpret_cast(&framework::kCurTensorVersion), + sizeof(framework::kCurTensorVersion)); + } + // the 2st field, LoD information + // in this scene, skip LoD information. + uint64_t size = 0; + os.write(reinterpret_cast(&size), sizeof(size)); + } + + void SerializeTensorHeaderToStream( + std::ostream &os, const framework::proto::VarType::Type &type, + const framework::DDim &dims) const { + { // the 1st field, uint32_t version + constexpr uint32_t version = 0; + os.write(reinterpret_cast(&version), sizeof(version)); + } + { // the 2nd field, tensor description + // int32_t size + // void* protobuf message + framework::proto::VarType::TensorDesc desc; + desc.set_data_type(type); + auto tensor_dims = framework::vectorize(dims); + auto *pb_dims = desc.mutable_dims(); + pb_dims->Resize(static_cast(tensor_dims.size()), 0); + std::copy(tensor_dims.begin(), tensor_dims.end(), pb_dims->begin()); + int32_t size = desc.ByteSize(); + os.write(reinterpret_cast(&size), sizeof(size)); + auto out = desc.SerializeAsString(); + os.write(out.data(), size); + } + } + + void SerializeTensorAppendToStream(std::ostream &os, + const framework::Tensor &tensor) const { + uint64_t size = tensor.numel() * framework::SizeOfType(tensor.type()); + auto *data_ptr = tensor.data(); + + PADDLE_ENFORCE_LT(size, std::numeric_limits::max(), + platform::errors::ResourceExhausted( + "tensor size %d overflow when writing tensor", size)); + os.write(static_cast(data_ptr), + static_cast(size)); + } + + public: + void Compute(const framework::ExecutionContext &ctx) const override { + auto place = ctx.GetPlace(); + + auto filename = ctx.Attr("file_path"); + auto overwrite = ctx.Attr("overwrite"); + + if (FileExists(filename) && !overwrite) { + PADDLE_THROW_ERROR( + "%s is existed, cannot save to it when overwrite=false", filename, + overwrite); + } + + MkDirRecursively(DirName(filename).c_str()); + + auto origin_shape = ctx.Attr>("shape"); + auto slice_shapes = ctx.Attr>("slice_shapes"); + auto slice_varnames = ctx.Attr>("slice_varnames"); + auto remote_varnames = + ctx.Attr>("remote_varnames"); + auto endpoints = ctx.Attr>("endpoints"); + + PADDLE_ENFORCE_EQ(slice_shapes.size(), slice_varnames.size(), + platform::errors::InvalidArgument( + "Expected attr len(slice_shapes) must be equal to " + "len(slice_varnames)")); + + PADDLE_ENFORCE_EQ( + slice_shapes.size(), endpoints.size(), + platform::errors::InvalidArgument( + "Expected attr len(slice_shapes) must be equal to len(endpoints)")); + + auto data_type = + static_cast(ctx.Attr("dtype")); + + // it to save an output stream. + std::ofstream fout(filename, std::ios::binary); + PADDLE_ENFORCE_EQ( + static_cast(fout), true, + platform::errors::NotFound("Cannot open %s to write", filename)); + + SerializeVersionToStream(fout); + SerializeTensorHeaderToStream(fout, data_type, + framework::make_ddim(origin_shape)); + + framework::Scope &local_scope = ctx.scope().NewScope(); + + auto trainer_id = ctx.Attr("trainer_id"); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &device_ctx = *pool.Get(place); + + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(trainer_id); + + for (size_t i = 0; i < slice_varnames.size(); i++) { + auto &varname = slice_varnames[i]; + auto *var = local_scope.Var(varname); + auto *tensor = var->GetMutable(); + + auto slice_string = + string::split_string(slice_shapes[i], ","); + std::vector slice_shape; + + for (auto &dim : slice_string) { + slice_shape.push_back(static_cast(std::stoull(dim))); + } + + tensor->Resize(framework::make_ddim(slice_shape)); + + distributed::VarHandlePtr ret; + + ret = rpc_client->AsyncGetVarNoBarrier( + endpoints[i], device_ctx, local_scope, remote_varnames[i], varname); + + PADDLE_ENFORCE_NE( + ret->Wait(), 0U, + platform::errors::ExecutionTimeout( + "rpc error when communication with %s", endpoints[i])); + + auto &c_tensor = var->Get(); + + SerializeTensorAppendToStream(fout, c_tensor); + local_scope.EraseVars({varname}); + } + + fout.close(); + ctx.scope().DeleteScope(&local_scope); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(recv_save, ops::RecvSaveOp, ops::RecvSaveOpProtoMaker); + +REGISTER_OP_CPU_KERNEL( + recv_save, ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel); diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.h b/paddle/fluid/operators/hierarchical_sigmoid_op.h index daf99586f8..7527736704 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.h +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.h @@ -68,46 +68,6 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel { size_t num_classes = static_cast(ctx.Attr("num_classes")); // for remote prefetch - auto remote_prefetch = ctx.Attr("remote_prefetch"); - auto epmap = ctx.Attr>("epmap"); - if (remote_prefetch && !epmap.empty()) { - // if epmap is not empty, then the parameter will be fetched from remote - // parameter - // server - auto height_sections = ctx.Attr>("height_sections"); - auto table_names = ctx.Attr>("table_names"); - std::vector real_rows = PathToRows(*path); - framework::Scope& local_scope = ctx.scope().NewScope(); - auto* ids = local_scope.Var("Ids@Prefetch"); - auto* x_tensor = ids->GetMutable(); - - x_tensor->mutable_data( - framework::make_ddim({static_cast(real_rows.size()), 1}), - ctx.GetPlace()); - // copy. - - std::memcpy(x_tensor->data(), real_rows.data(), - real_rows.size() * sizeof(int64_t)); - - framework::DDim w_dims = ctx.Input("W")->dims(); - w_dims[0] = x_tensor->dims()[0]; - auto* w_tensor = - local_scope.Var("W@Prefetch")->GetMutable(); - w_tensor->Resize(w_dims); - -#ifdef PADDLE_WITH_DISTRIBUTE - // w_Out is set to used by prefetch, never change it in other cases - auto weight = ctx.OutputNames("W_Out").front(); - operators::distributed::prefetch("Ids@Prefetch", "W@Prefetch", weight, - true, table_names, epmap, - height_sections, ctx, local_scope); -#else - PADDLE_THROW( - "paddle is not compiled with distribute support, can not do " - "parameter prefetch!"); -#endif - } - bool is_custom = false; if (path) { is_custom = true; diff --git a/paddle/fluid/operators/load_op.cc b/paddle/fluid/operators/load_op.cc index 435c755df3..4f2c9a6ca0 100644 --- a/paddle/fluid/operators/load_op.cc +++ b/paddle/fluid/operators/load_op.cc @@ -48,8 +48,14 @@ class LoadOpProtoMaker : public framework::OpProtoAndCheckerMaker { R"(Variable will be loaded from "file_path")") .AddCustomChecker( [](const std::string &path) { return !path.empty(); }); + AddAttr("seek", "(int64_t) Starting for load tensor from seek pos") + .SetDefault(-1); + AddAttr>("shape", + "(vector) The shape of the output") + .SetDefault({}); AddComment( - "Load operator will load a LoDTensor / SelectedRows variable from disk " + "Load operator will load a LoDTensor / SelectedRows variable from " + "disk " "file."); } }; diff --git a/paddle/fluid/operators/load_op.h b/paddle/fluid/operators/load_op.h index 7347fd5e05..5a409502fd 100644 --- a/paddle/fluid/operators/load_op.h +++ b/paddle/fluid/operators/load_op.h @@ -16,6 +16,7 @@ limitations under the License. */ #include #include +#include #include "paddle/fluid/framework/data_type_transform.h" #include "paddle/fluid/framework/op_registry.h" @@ -63,7 +64,18 @@ class LoadOpKernel : public framework::OpKernel { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(place); auto *tensor = var->GetMutable(); - DeserializeFromStream(fin, tensor, dev_ctx); + + auto seek = ctx.Attr("seek"); + + if (seek != -1) { + PADDLE_ENFORCE_GE(seek, 0, + platform::errors::InvalidArgument( + "seek witn tensor must great than or equal to 0")); + auto shape = ctx.Attr>("shape"); + DeserializeFromStream(fin, tensor, dev_ctx, seek, shape); + } else { + DeserializeFromStream(fin, tensor, dev_ctx); + } auto load_as_fp16 = ctx.Attr("load_as_fp16"); auto in_dtype = tensor->type(); diff --git a/paddle/fluid/string/to_string.h b/paddle/fluid/string/to_string.h index f5693ee7f1..7b3332861e 100644 --- a/paddle/fluid/string/to_string.h +++ b/paddle/fluid/string/to_string.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include namespace paddle { namespace string { diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 6f58a700a5..eed3b79591 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -30,7 +30,8 @@ from paddle.reader import * from paddle.fluid import layers from paddle.fluid.executor import Executor, global_scope from paddle.fluid.evaluator import Evaluator -from paddle.fluid.framework import Program, Parameter, default_main_program, default_startup_program, Variable, program_guard +from paddle.fluid.framework import Program, Parameter, default_main_program, default_startup_program, Variable, \ + program_guard from paddle.fluid.compiler import CompiledProgram from paddle.fluid.log_helper import get_logger from . import reader @@ -383,72 +384,42 @@ def _save_distributed_persistables(executor, dirname, main_program): # recv optimize vars from pserver for name, remote_params in remote_params_map.items(): - origin_var = None - is_slice = False - slice_vars = [0] * len(remote_params) - slice_var_names = [""] * len(remote_params) - endpoints = [""] * len(remote_params) + origin = remote_params[0].origin + is_slice = remote_params[0].is_slice + + slices = [None] * len(remote_params) + slice_varnames = [None] * len(remote_params) + remote_varnames = [None] * len(remote_params) + endpoints = [None] * len(remote_params) for idx, optimizer in enumerate(remote_params): - origin = optimizer.origin - slice = optimizer.slice - is_slice = optimizer.is_slice block_id = optimizer.block_id + slice = optimizer.slice endpoint = optimizer.endpoint - if idx == 0: - origin_var = block.create_var( - name=origin.name, - type=origin.type, - shape=origin.shape, - dtype=origin.dtype, - persistable=True) - - slice_var = block.create_var( - name="{}.slice.{}".format(slice.name, idx), - type=slice.type, - shape=slice.shape, - dtype=slice.dtype, - persistable=True) - index = block_id if is_slice else idx - slice_vars[index] = slice_var - slice_var_names[index] = slice.name + slices[index] = slice + slice_varnames[index] = "{}.slice.{}".format(slice.name, idx) + remote_varnames[index] = slice.name endpoints[index] = endpoint - if is_slice: - block.append_op( - type='recv', - inputs={"X": []}, - outputs={"Out": slice_vars}, - attrs={ - "epmap": endpoints, - "with_barrier": False, - "varnames": slice_var_names, - "sync_mode": True - }) - block.append_op( - type='concat', - inputs={'X': slice_vars}, - outputs={'Out': origin_var}, - attrs={}) - else: - block.append_op( - type='recv', - inputs={"X": []}, - outputs={"Out": [origin_var]}, - attrs={ - "epmap": endpoints[:1], - "with_barrier": False, - "varnames": slice_var_names, - "sync_mode": True - }) + slice_shapes = [] + for slice in slices: + tmp = [str(dim) for dim in slice.shape] + slice_shapes.append(",".join(tmp)) + block.append_op( - type='save', - inputs={'X': [origin_var]}, - outputs={}, - attrs={'file_path': os.path.join(dirname, origin_var.name)}) - block.append_op(type='delete_var', inputs={'X': slice_vars}) + type='recv_save', + attrs={ + "trainer_id": 0, + "shape": origin.shape, + "slice_shapes": slice_shapes, + "slice_varnames": slice_varnames, + "remote_varnames": remote_varnames, + "endpoints": endpoints, + "file_path": os.path.join(dirname, origin.name) + }) + executor.run(prog) def __save_distributed_lookup_tables(executor, dirname, @@ -478,8 +449,8 @@ def _save_distributed_persistables(executor, dirname, main_program): if var.name in exclude_var_names: return False if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.READER: + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: return False return var.persistable @@ -690,7 +661,7 @@ def load_vars(executor, if not isinstance(main_program, Program): raise TypeError("program should be as Program type or None") - #save origin param shape + # save origin param shape orig_para_shape = {} load_var_map = {} for each_var in vars: @@ -725,7 +696,7 @@ def load_vars(executor, attrs={'file_path': os.path.join(load_dirname, filename)}) executor.run(load_prog) - #check var shape + # check var shape for each_var in vars: if not isinstance(each_var, Parameter): continue @@ -893,21 +864,6 @@ def _load_distributed_persistables(executor, dirname, main_program=None): offset = param.offset if is_slice: - origin = load_block.create_var( - name="{}.load".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - slice = load_block.create_var( name=slice_var.name, type=slice_var.type, @@ -915,22 +871,15 @@ def _load_distributed_persistables(executor, dirname, main_program=None): dtype=slice_var.dtype, persistable=True) - dim1_flatten = 1 - if len(slice.shape) >= 2: - dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:]) - - start = int(offset / dim1_flatten) - end = int(offset / dim1_flatten + slice.shape[0]) - load_block.append_op( - type="slice", - inputs={'Input': origin}, - outputs={'Out': slice}, - attrs={'axes': [0], - 'starts': [start], - 'ends': [end]}) - - need_delete_vars.append(origin) + type='load', + inputs={}, + outputs={'Out': [slice]}, + attrs={ + 'file_path': os.path.join(dirname, origin_var.name), + 'seek': offset, + 'shape': slice.shape + }) else: origin = load_block.create_var( name="{}".format(origin_var.name), @@ -1517,7 +1466,7 @@ def save(program, model_path): base_name = os.path.basename(model_path) assert base_name != "", \ - "model_path MUST be format of dirname/filename [dirname\\filename in Window], Now filename is empty str" + "model_path MUST be format of dirname/filename [dirname\\filename in Window], Now filename is empty str" def get_tensor(var): t = global_scope().find_var(var.name).get_tensor() @@ -1574,7 +1523,7 @@ def load(program, model_path, executor=None): parameter_file_name = model_path + ".pdparams" assert os.path.exists(parameter_file_name), \ - "Parameter file [{}] not exits".format(parameter_file_name) + "Parameter file [{}] not exits".format(parameter_file_name) def set_var(var, ndarray): t = global_scope().find_var(var.name).get_tensor() @@ -1610,7 +1559,7 @@ def load(program, model_path, executor=None): if len(optimizer_var_list) > 0: opt_file_name = model_path + ".pdopt" assert os.path.exists(opt_file_name), \ - "Optimizer file [{}] not exits".format( opt_file_name) + "Optimizer file [{}] not exits".format(opt_file_name) if executor: paddle.fluid.core._create_loaded_parameter( @@ -1655,7 +1604,7 @@ def load_program_state(model_path): """ parameter_file_name = model_path + ".pdparams" assert os.path.exists(parameter_file_name), \ - "Parameter file [{}] not exits".format( parameter_file_name) + "Parameter file [{}] not exits".format(parameter_file_name) with open(parameter_file_name, 'rb') as f: para_dict = pickle.load(f) @@ -1707,25 +1656,25 @@ def set_program_state(program, state_dict): for para in parameter_list: var_temp = paddle.fluid.global_scope().find_var(para.name) assert var_temp != None, \ - "Variable [ {} ] Not found, Please make sure run startup program".format( para.name ) + "Variable [ {} ] Not found, Please make sure run startup program".format(para.name) if para.name in state_dict: # set value from state dict orig_para_np = np.array(var_temp.get_tensor()) new_para_np = state_dict[para.name] - assert orig_para_np.shape == new_para_np.shape, \ - "Shape not matching: the Program requires a parameter with a shape of ({}), " \ - "while the loaded parameter (namely [ {} ]) has a shape of ({})." \ + assert orig_para_np.shape == new_para_np.shape, \ + "Shape not matching: the Program requires a parameter with a shape of ({}), " \ + "while the loaded parameter (namely [ {} ]) has a shape of ({})." \ .format(orig_para_np.shape, para.name, new_para_np.shape) - assert orig_para_np.dtype == new_para_np.dtype, \ - "Dtype not matching: the Program requires a parameter with a dtype of ({}), " \ - "while the loaded parameter (namely [ {} ]) has a dtype of ({})." \ + assert orig_para_np.dtype == new_para_np.dtype, \ + "Dtype not matching: the Program requires a parameter with a dtype of ({}), " \ + "while the loaded parameter (namely [ {} ]) has a dtype of ({})." \ .format(orig_para_np.dtype, para.name, new_para_np.dtype) ten = var_temp.get_tensor() ten_place = ten._place() assert ten_place.is_gpu_place() or ten_place.is_cpu_place(), \ - "Place not support, only support CPUPlace and GPUPlace, now is {}".format( str(ten_place)) + "Place not support, only support CPUPlace and GPUPlace, now is {}".format(str(ten_place)) py_place = paddle.fluid.CPUPlace() if ten_place.is_cuda_pinned_place(): place = paddle.fluid.CUDAPinnedPlace() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index f4f11168d2..9614aa72f9 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,7 +15,8 @@ list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_simple_dist_transpiler) list(APPEND MIXED_DIST_TEST_OPS test_listen_and_serv_op) list(APPEND MIXED_DIST_TEST_OPS test_nce_remote_table_op) -list(APPEND MIXED_DIST_TEST_OPS test_hsigmoid_remote_table_op) +list(APPEND MIXED_DIST_TEST_OPS test_recv_save_op) +list(APPEND MIXED_DIST_TEST_OPS test_transpiler_ops) list(APPEND MIXED_DIST_TEST_OPS test_lookup_remote_table_op) list(APPEND MIXED_DIST_TEST_OPS test_launch) list(APPEND MIXED_DIST_TEST_OPS test_launch_ps) @@ -252,8 +253,9 @@ if(WITH_DISTRIBUTE) list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_base") py_test_modules(test_lookup_remote_table_op MODULES test_lookup_remote_table_op ENVS ${dist_ENVS}) - py_test_modules(test_hsigmoid_remote_table_op MODULES test_hsigmoid_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) + py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) + py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS}) if(WITH_DGC) py_test_modules(test_dgc_op MODULES test_dgc_op) py_test_modules(test_dgc_momentum_op MODULES test_dgc_momentum_op) diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index f81d4fda50..a2e95f0d65 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -23,6 +23,7 @@ import unittest import numpy as np import gc + gc.set_debug(gc.DEBUG_COLLECTABLE) import paddle.fluid as fluid @@ -274,6 +275,115 @@ class TestLRDecay(TranspilerTest): ]) +class TestFakeInit(TranspilerTest): + def net_conf(self): + dict_size, embedding_size, neg_num = 10000, 8, 5 + + input_word = fluid.layers.data( + name="input_word", shape=[1], dtype='int64', lod_level=1) + true_word = fluid.layers.data( + name='true_label', shape=[1], dtype='int64', lod_level=1) + neg_word = fluid.layers.data( + name="neg_label", shape=[1], dtype='int64', lod_level=1) + inputs = [input_word, true_word, neg_word] + + init_width = 0.5 / embedding_size + input_emb = fluid.layers.embedding( + input=inputs[0], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb', + initializer=fluid.initializer.Uniform(-init_width, init_width))) + + true_emb_w = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', + initializer=fluid.initializer.Constant(value=0.0))) + + true_emb_b = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', + initializer=fluid.initializer.Constant(value=0.0))) + + neg_word_reshape = fluid.layers.reshape(inputs[2], shape=[-1, 1]) + neg_word_reshape.stop_gradient = True + + neg_emb_w = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', learning_rate=1.0)) + + neg_emb_w_re = fluid.layers.reshape( + neg_emb_w, shape=[-1, neg_num, embedding_size]) + + neg_emb_b = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', learning_rate=1.0)) + + neg_emb_b_vec = fluid.layers.reshape(neg_emb_b, shape=[-1, neg_num]) + + true_logits = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + fluid.layers.elementwise_mul(input_emb, true_emb_w), + dim=1, + keep_dim=True), + true_emb_b) + + input_emb_re = fluid.layers.reshape( + input_emb, shape=[-1, 1, embedding_size]) + + neg_matmul = fluid.layers.matmul( + input_emb_re, neg_emb_w_re, transpose_y=True) + neg_matmul_re = fluid.layers.reshape(neg_matmul, shape=[-1, neg_num]) + neg_logits = fluid.layers.elementwise_add(neg_matmul_re, neg_emb_b_vec) + # nce loss + label_ones = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, 1], value=1.0, dtype='float32') + label_zeros = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, neg_num], value=0.0, dtype='float32') + + true_xent = fluid.layers.sigmoid_cross_entropy_with_logits(true_logits, + label_ones) + neg_xent = fluid.layers.sigmoid_cross_entropy_with_logits(neg_logits, + label_zeros) + cost = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + true_xent, dim=1), + fluid.layers.reduce_sum( + neg_xent, dim=1)) + avg_cost = fluid.layers.reduce_mean(cost) + + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=1.0, + decay_steps=2100, + decay_rate=0.1, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + + def transpiler_test_impl(self): + trainer, startup = self.get_trainer() + + fake_init_ops = [] + for op in startup.global_block().ops: + if op.type == "fake_init": + fake_init_ops.append(op) + + self.assertEqual(len(fake_init_ops), 3) + + class TestDecayedAdagrad(TranspilerTest): def net_conf(self): x = fluid.layers.data(name='x', shape=[1000], dtype='float32') @@ -788,7 +898,7 @@ class TestLoadSliceVar(TranspilerTest): class TestNCCL2Transpile(TranspilerTest): def test_nccl2_transpile(self): - if fluid.core.is_compiled_with_cuda(): #test nccl2 only with cuda + if fluid.core.is_compiled_with_cuda(): # test nccl2 only with cuda main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): diff --git a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py deleted file mode 100644 index 014d30486d..0000000000 --- a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -import signal -import time -import unittest -from multiprocessing import Process - -import numpy as np -import paddle.fluid as fluid -import paddle.fluid.core as core -from paddle.fluid.op import Operator -from paddle.fluid.framework import Program, program_guard -from dist_test_utils import * - - -def run_pserver(pserver_id, use_cuda, sync_mode): - remove_ps_flag(os.getpid()) - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - # create table parameter in scope - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - # create and initialize Param Variable - param = scope.var('table').get_tensor() - - param_array = np.ones((5, 8)).astype("float32") - for i in range(len(param_array)): - param_array[i] *= param_array[i] * i + pserver_id * 10 + 1 - param.set(param_array, place) - - optimize_block = program._create_block(program.global_block().idx) - program.global_block().append_op( - type="listen_and_serv", - inputs={'X': []}, - outputs={}, - attrs={ - "optimize_blocks": [optimize_block], - "endpoint": '127.0.0.1:0', - "Fanin": 1, - "sync_mode": True, - "grad_to_block_id": [] - }) - - exe = fluid.Executor(place) - exe.run(program) - - -class TestListenAndServOp(unittest.TestCase): - def setUp(self): - self.ps_timeout = 5 - - def _start_pserver(self, pserver_id, use_cuda, sync_mode, pserver_func): - p = Process(target=pserver_func, args=(pserver_id, use_cuda, sync_mode)) - p.daemon = True - p.start() - return p - - def _wait_ps_ready(self, pid): - start_left_time = self.ps_timeout - sleep_time = 0.5 - while True: - assert start_left_time >= 0, "wait ps ready failed" - time.sleep(sleep_time) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - start_left_time -= sleep_time - - def _get_pserver_port(self, pid): - with open("/tmp/paddle.%d.port" % pid, 'r') as f: - port = int(f.read().strip()) - return port - - def _run_hsigmoid_op_one_pserver(self, place, port): - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - x = scope.var('X').get_tensor() - x_array = np.random.random((4, 8)).astype("float32") * 2 - x.set(x_array, place) - # create and initialize Param Variable - param = scope.var('W').get_tensor() - param_array = np.zeros((5, 8)).astype("float32") * 2 - param.set(param_array, place) - - path_table = scope.var('PathTable').get_tensor() - path_table_array = np.array( - [(0, 2, -1, -1, -1), (0, 1, 2, -1, -1), (0, 1, 4, -1, -1), - (0, 2, -1, -1, -1)]).astype( - "int64" - ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) - path_table.set(path_table_array, place) - - path_code = scope.var('PathCode').get_tensor() - path_code_array = np.array( - [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), - (0, 1, -1, -1, -1)]).astype("int64") #np.array to store - path_code.set(path_code_array, place) - - label = scope.var('Label').get_tensor() - label_array = np.array([0, 1, 4, 5]) - label.set(label_array, place) - - bias = scope.var('Bias').get_tensor() - bias_array = np.random.random((5, 1)).astype("float32") - bias.set(bias_array, place) - - out = scope.var('Out').get_tensor() - - pre_out = scope.var('PreOut').get_tensor - - w_out = scope.var('W_Out').get_tensor() - w_out.set(param_array, place) - - emaps = ['127.0.0.1:' + str(port)] - table_names = ['table'] - height_sections = [2] - - # create and run sgd operator - hsigmoid_op = Operator( - "hierarchical_sigmoid", - X='X', - W='W', - PathTable='PathTable', - PathCode='PathCode', - Label='Label', - Bias='Bias', - Out='Out', - PreOut='PreOut', - W_Out='W_Out', - remote_prefetch=True, - epmap=emaps, - table_names=table_names, - height_sections=height_sections) - - hsigmoid_op.run(scope, place) - - # get and compare result - result_array = np.array(w_out) - self.assertEqual(list(result_array.shape), [5, 8]) - correct = None - for i in range(5): - if i != 3: - correct = np.full((1, 8), i + 1).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - else: - correct = np.full((1, 8), 0).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - - def _run_hsigmoid_op_two_pserver(self, place, port0, port1): - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - x = scope.var('X').get_tensor() - x_array = np.random.random((4, 8)).astype("float32") * 2 - x.set(x_array, place) - # create and initialize Param Variable - param = scope.var('W').get_tensor() - param_array = np.zeros((5, 8)).astype("float32") * 2 - param.set(param_array, place) - - path_table = scope.var('PathTable').get_tensor() - path_table_array = np.array( - [(0, 2, -1, -1, -1), (0, 1, 3, -1, -1), (0, 1, 4, -1, -1), - (0, 2, -1, -1, -1)]).astype( - "int64" - ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) - path_table.set(path_table_array, place) - - path_code = scope.var('PathCode').get_tensor() - path_code_array = np.array( - [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), - (0, 1, -1, -1, -1)]).astype("int64") #np.array to store - path_code.set(path_code_array, place) - - label = scope.var('Label').get_tensor() - label_array = np.array([0, 1, 4, 5]) - label.set(label_array, place) - - bias = scope.var('Bias').get_tensor() - bias_array = np.random.random((5, 1)).astype("float32") - bias.set(bias_array, place) - - out = scope.var('Out').get_tensor() - - pre_out = scope.var('PreOut').get_tensor - - w_out = scope.var('W_Out').get_tensor() - w_out.set(param_array, place) - - emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)] - table_names = ['table', 'table'] - height_sections = [2, 3] - - # create and run sgd operator - hsigmoid_op = Operator( - "hierarchical_sigmoid", - X='X', - W='W', - PathTable='PathTable', - PathCode='PathCode', - Label='Label', - Bias='Bias', - Out='Out', - PreOut='PreOut', - W_Out='W_Out', - remote_prefetch=True, - epmap=emaps, - table_names=table_names, - height_sections=height_sections) - hsigmoid_op.run(scope, place) - - # get and compare result - result_array = np.array(w_out) - self.assertEqual(list(result_array.shape), [5, 8]) - correct = None - for i in range(5): - if i < 2: - correct = np.full((1, 8), i + 1).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - else: - correct = np.full((1, 8), i + 9).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - - def test_hsigmoid_op_remote(self): - os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1" - # run pserver on CPU in sync mode - p0 = self._start_pserver(0, False, True, run_pserver) - self._wait_ps_ready(p0.pid) - port0 = self._get_pserver_port(p0.pid) - - p1 = self._start_pserver(1, False, True, run_pserver) - self._wait_ps_ready(p1.pid) - port1 = self._get_pserver_port(p1.pid) - - places = [core.CPUPlace()] - - for place in places: - self._run_hsigmoid_op_one_pserver(place, port0) - self._run_hsigmoid_op_two_pserver(place, port0, port1) - - # raise SIGTERM to pserver - os.kill(p0.pid, signal.SIGINT) - p0.join() - os.kill(p1.pid, signal.SIGINT) - p1.join() - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_recv_save_op.py b/python/paddle/fluid/tests/unittests/test_recv_save_op.py new file mode 100644 index 0000000000..a8324a6a02 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_recv_save_op.py @@ -0,0 +1,265 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import signal +import time +import shutil +import unittest + +from multiprocessing import Process + +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid.op import Operator +from paddle.fluid.framework import Program, program_guard +from paddle.fluid.transpiler.details import VarStruct, VarsDistributed +from dist_test_utils import * + + +def run_pserver(pserver_id): + remove_ps_flag(os.getpid()) + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + # create table parameter in scope + place = fluid.CPUPlace() + # create and initialize Param Variable + param = scope.var('table').get_tensor() + + param_array = np.ones((5, 8)).astype("float32") + for i in range(len(param_array)): + param_array[i] *= param_array[i] * i + pserver_id * 10 + 1 + param.set(param_array, place) + + optimize_block = program._create_block(program.global_block().idx) + program.global_block().append_op( + type="listen_and_serv", + inputs={'X': []}, + outputs={}, + attrs={ + "optimize_blocks": [optimize_block], + "endpoint": '127.0.0.1:0', + "Fanin": 1, + "sync_mode": True, + "grad_to_block_id": [] + }) + + exe = fluid.Executor(place) + exe.run(program) + + +class TestListenAndServOp(unittest.TestCase): + def setUp(self): + self.ps_timeout = 5 + + def _start_pserver(self, pserver_id, pserver_func): + p = Process(target=pserver_func, args=(pserver_id, )) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def _get_pserver_port(self, pid): + with open("/tmp/paddle.%d.port" % pid, 'r') as f: + port = int(f.read().strip()) + return port + + def _run_nce_op_two_pserver(self, place, port0, port1, model_file): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)] + + # create and run recv and save operator + remote_recv_op = Operator( + "recv_save", + trainer_id=0, + shape=[10, 8], + slice_shapes=["5,8", "5,8"], + slice_varnames=["table", "table"], + remote_varnames=['table', 'table'], + endpoints=emaps, + file_path=model_file) + + remote_recv_op.run(scope, place) + + def _load_slice_var(self, model_file): + load_prog = fluid.Program() + load_block = load_prog.global_block() + + origin = load_block.create_var( + name="var.origin", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[10, 8], + dtype="float32", + persistable=True) + + slice0 = load_block.create_var( + name="var.slice0", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[3, 8], + dtype="float32", + persistable=True) + + slice1 = load_block.create_var( + name="var.slice1", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + persistable=True) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [origin]}, + attrs={'file_path': model_file}) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [slice0]}, + attrs={ + 'file_path': model_file, + 'seek': 2 * 8, + 'shape': slice0.shape + }) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [slice1]}, + attrs={ + 'file_path': model_file, + 'seek': 5 * 8, + 'shape': slice1.shape + }) + + exe = fluid.Executor(place=fluid.CPUPlace()) + exe.run(load_prog) + + origin_var = fluid.global_scope().find_var("var.origin") + slice0_var = fluid.global_scope().find_var("var.slice0") + slice1_var = fluid.global_scope().find_var("var.slice1") + + origin = np.array(origin_var.get_tensor()) + slice0 = np.array(slice0_var.get_tensor()) + slice1 = np.array(slice1_var.get_tensor()) + + np.testing.assert_equal(origin[2:5], slice0) + np.testing.assert_equal(origin[5:10], slice1) + + def _save_by_io_persistables(self, place, port0, port1, dirname, var_name): + exe = fluid.Executor(place=place) + + vars_overview = VarsDistributed() + + orig_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[10, 8], + dtype="float32", + lod_level=0, + persistable=True) + + slice_0_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + lod_level=0, + persistable=True) + + slice_1_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + lod_level=0, + persistable=True) + + vars_overview.add_distributed_var( + origin_var=orig_var, + slice_var=slice_0_var, + block_id=0, + offset=0, + is_slice=True, + vtype="RemotePrefetch", + endpoint="{}:{}".format("127.0.0.1", port0)) + + vars_overview.add_distributed_var( + origin_var=orig_var, + slice_var=slice_1_var, + block_id=1, + offset=40, + is_slice=True, + vtype="RemotePrefetch", + endpoint="{}:{}".format("127.0.0.1", port1)) + + program = Program() + program._is_distributed = True + program._is_chief = True + program._parameters_on_pservers = vars_overview + + fluid.io.save_persistables(exe, dirname, program) + + def test_recv_save_op_remote(self): + # run pserver on CPU in sync mode + p0 = self._start_pserver(0, run_pserver) + self._wait_ps_ready(p0.pid) + port0 = self._get_pserver_port(p0.pid) + + p1 = self._start_pserver(1, run_pserver) + self._wait_ps_ready(p1.pid) + port1 = self._get_pserver_port(p1.pid) + + places = [core.CPUPlace()] + + param_dir = "./model_for_test_recv_save_op/" + param_name = "table" + + for place in places: + self._save_by_io_persistables(place, port0, port1, param_dir, + param_name) + + # raise SIGTERM to pserver + os.kill(p0.pid, signal.SIGINT) + p0.join() + os.kill(p1.pid, signal.SIGINT) + p1.join() + + self._load_slice_var(param_dir + param_name) + shutil.rmtree(param_dir) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_transpiler_ops.py b/python/paddle/fluid/tests/unittests/test_transpiler_ops.py new file mode 100644 index 0000000000..9512ae495d --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_transpiler_ops.py @@ -0,0 +1,143 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import traceback +import math +import collections + +import six +import unittest +import numpy as np + +import gc + +gc.set_debug(gc.DEBUG_COLLECTABLE) + +import paddle.fluid as fluid +from test_dist_transpiler import TranspilerTest + + +class TestFakeInit(TranspilerTest): + def net_conf(self): + dict_size, embedding_size, neg_num = 10000, 8, 5 + + input_word = fluid.layers.data( + name="input_word", shape=[1], dtype='int64', lod_level=1) + true_word = fluid.layers.data( + name='true_label', shape=[1], dtype='int64', lod_level=1) + neg_word = fluid.layers.data( + name="neg_label", shape=[1], dtype='int64', lod_level=1) + inputs = [input_word, true_word, neg_word] + + init_width = 0.5 / embedding_size + input_emb = fluid.layers.embedding( + input=inputs[0], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb', + initializer=fluid.initializer.Uniform(-init_width, init_width))) + + true_emb_w = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', + initializer=fluid.initializer.Constant(value=0.0))) + + true_emb_b = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', + initializer=fluid.initializer.Constant(value=0.0))) + + neg_word_reshape = fluid.layers.reshape(inputs[2], shape=[-1, 1]) + neg_word_reshape.stop_gradient = True + + neg_emb_w = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', learning_rate=1.0)) + + neg_emb_w_re = fluid.layers.reshape( + neg_emb_w, shape=[-1, neg_num, embedding_size]) + + neg_emb_b = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', learning_rate=1.0)) + + neg_emb_b_vec = fluid.layers.reshape(neg_emb_b, shape=[-1, neg_num]) + + true_logits = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + fluid.layers.elementwise_mul(input_emb, true_emb_w), + dim=1, + keep_dim=True), + true_emb_b) + + input_emb_re = fluid.layers.reshape( + input_emb, shape=[-1, 1, embedding_size]) + + neg_matmul = fluid.layers.matmul( + input_emb_re, neg_emb_w_re, transpose_y=True) + neg_matmul_re = fluid.layers.reshape(neg_matmul, shape=[-1, neg_num]) + neg_logits = fluid.layers.elementwise_add(neg_matmul_re, neg_emb_b_vec) + # nce loss + label_ones = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, 1], value=1.0, dtype='float32') + label_zeros = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, neg_num], value=0.0, dtype='float32') + + true_xent = fluid.layers.sigmoid_cross_entropy_with_logits(true_logits, + label_ones) + neg_xent = fluid.layers.sigmoid_cross_entropy_with_logits(neg_logits, + label_zeros) + cost = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + true_xent, dim=1), + fluid.layers.reduce_sum( + neg_xent, dim=1)) + avg_cost = fluid.layers.reduce_mean(cost) + + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=1.0, + decay_steps=2100, + decay_rate=0.1, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + + def transpiler_test_impl(self): + trainer, startup = self.get_trainer() + + fake_init_ops = [] + for op in startup.global_block().ops: + if op.type == "fake_init": + fake_init_ops.append(op) + + self.assertEqual(len(fake_init_ops), 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index bd6def9506..3579536c96 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -403,7 +403,7 @@ class DistributeTranspiler(object): def _get_all_remote_sparse_update_op(self, main_program): sparse_update_ops = [] - sparse_update_op_types = ["lookup_table", "nce", "hierarchical_sigmoid"] + sparse_update_op_types = ["lookup_table", "nce"] for op in main_program.global_block().ops: if op.type in sparse_update_op_types and op.attr( 'remote_prefetch') is True: @@ -607,6 +607,7 @@ class DistributeTranspiler(object): self.origin_program) # use_sparse_update_param_name -> split_height_section self.sparse_param_to_height_sections = dict() + self.need_delete_optimize_vars = [] # add distributed attrs to program self.origin_program._is_distributed = True @@ -861,6 +862,78 @@ class DistributeTranspiler(object): self._get_distributed_optimizer_vars() self.origin_program._parameters_on_pservers = self.vars_overview + def _get_sparse_table_names(self): + sparse_update_op_types = ["lookup_table", "nce"] + + sparse_table_names = [] + for op in self.origin_program.global_block().ops: + if op.type in sparse_update_op_types and op.attr( + 'is_sparse') is True: + sparse_table_names.append(op.input("W")[0]) + if op.type == "distributed_lookup_table": + sparse_table_names.append(op.input("W")[0]) + + if self.has_distributed_lookup_table: + sparse_table_names.append(self.table_name) + + return list(set(sparse_table_names)) + + def _fake_init_sparsetable(self, sparse_table_names): + # delete table init op + for table_name in sparse_table_names: + table_var = self.startup_program.global_block().vars[table_name] + table_param_init_op = [] + for op in self.startup_program.global_block().ops: + if table_name in op.output_arg_names: + table_param_init_op.append(op) + init_op_num = len(table_param_init_op) + if init_op_num != 1: + raise ValueError("table init op num should be 1, now is " + str( + init_op_num)) + table_init_op = table_param_init_op[0] + self.startup_program.global_block().append_op( + type="fake_init", + inputs={}, + outputs={"Out": table_var}, + attrs={"shape": table_init_op.attr('shape')}) + delete_ops(self.startup_program.global_block(), table_param_init_op) + + def _delete_trainer_optimizer(self, is_startup): + optimize_vars = [] + optimize_op_role_vars = [] + optimize_need_delete_vars = [] + + for op in self.optimize_ops: + optimize_vars.extend(op.input_arg_names) + optimize_op_role_vars.extend(op.attr("op_role_var")) + + optimize_vars = list(set(optimize_vars)) + optimize_op_role_vars = list(set(optimize_op_role_vars)) + + for var in optimize_vars: + if var not in optimize_op_role_vars: + optimize_need_delete_vars.append(var) + need_delete_optimize_vars = list(set(optimize_need_delete_vars)) + + if is_startup: + init_ops = [] + for var in need_delete_optimize_vars: + param_init_op = [] + for op in self.startup_program.global_block().ops: + if var in op.output_arg_names: + param_init_op.append(op) + init_ops.extend(param_init_op) + delete_ops(self.startup_program.global_block(), init_ops) + + for var in need_delete_optimize_vars: + if self.startup_program.global_block().has_var(var): + self.startup_program.global_block()._remove_var(var) + else: + delete_ops(self.origin_program.global_block(), self.optimize_ops) + for var in need_delete_optimize_vars: + if self.origin_program.global_block().has_var(var): + self.origin_program.global_block()._remove_var(var) + def get_trainer_program(self, wait_port=True): """ Get transpiled trainer side program. The program on trainer side compared with origin program @@ -891,31 +964,16 @@ class DistributeTranspiler(object): # remove optimize ops and add a send op to main_program # FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay? + self._delete_trainer_optimizer(is_startup=True) + sparse_table_names = self._get_sparse_table_names() + self._fake_init_sparsetable(sparse_table_names) + lr_ops = self._get_lr_ops() - delete_ops(self.origin_program.global_block(), self.optimize_ops) delete_ops(self.origin_program.global_block(), lr_ops) - - # delete table init op - if self.has_distributed_lookup_table: - table_var = self.startup_program.global_block().vars[ - self.table_name] - table_param_init_op = [] - for op in self.startup_program.global_block().ops: - if self.table_name in op.output_arg_names: - table_param_init_op.append(op) - init_op_num = len(table_param_init_op) - if init_op_num != 1: - raise ValueError("table init op num should be 1, now is " + str( - init_op_num)) - table_init_op = table_param_init_op[0] - self.startup_program.global_block().append_op( - type="fake_init", - inputs={}, - outputs={"Out": table_var}, - attrs={"shape": table_init_op.attr('shape')}) - delete_ops(self.startup_program.global_block(), table_param_init_op) + self._delete_trainer_optimizer(is_startup=False) self.origin_program.__str__() + self.startup_program.__str__() if wait_port: wait_server_ready(self.pserver_endpoints) @@ -937,8 +995,14 @@ class DistributeTranspiler(object): # FIXME(gongwb): delete not need ops. # note that: some parameter is not trainable and those ops can't be deleted. + sparse_table_names = self._get_sparse_table_names() + + # self._fake_init_sparsetable(sparse_table_names) + #self._delete_trainer_optimizer(is_startup=True) for varname, splited_var in six.iteritems(self.param_var_mapping): + if varname in sparse_table_names: + continue # Get the eplist of recv vars eps = [] for var in splited_var: @@ -980,6 +1044,8 @@ class DistributeTranspiler(object): }) for varname, splited_var in six.iteritems(self.param_var_mapping): + if varname in sparse_table_names: + continue # add concat ops to merge splited parameters received from parameter servers. if len(splited_var) <= 1: continue -- GitLab